- Add complete pytest testing framework with conftest.py and test files - Add performance monitoring and benchmarking capabilities - Add plugin system with ccache plugin example - Add comprehensive documentation (API, deployment, testing, etc.) - Add Docker API wrapper for service deployment - Add advanced configuration examples - Remove old wget package file - Update core modules with enhanced functionality
625 lines
25 KiB
Python
625 lines
25 KiB
Python
"""
|
|
Core DebMock class for orchestrating the build process
|
|
"""
|
|
|
|
import os
|
|
import threading
|
|
import concurrent.futures
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional
|
|
import time
|
|
|
|
from .cache import CacheManager
|
|
from .chroot import ChrootManager
|
|
from .config import Config
|
|
from .exceptions import ChrootError
|
|
from .metadata import MetadataManager
|
|
from .sbuild import SbuildWrapper
|
|
from .plugin import PluginManager, HookStages
|
|
from .performance import PerformanceMonitor, PerformanceOptimizer, PerformanceReporter
|
|
|
|
|
|
class DebMock:
|
|
"""Main DebMock class for orchestrating package builds"""
|
|
|
|
def __init__(self, config: Config):
|
|
self.config = config
|
|
self.chroot_manager = ChrootManager(config)
|
|
self.sbuild_wrapper = SbuildWrapper(config)
|
|
self.metadata_manager = MetadataManager(config)
|
|
self.cache_manager = CacheManager(config)
|
|
self.plugin_manager = PluginManager(config)
|
|
|
|
# Validate configuration
|
|
self.config.validate()
|
|
|
|
# Setup caches
|
|
self._setup_caches()
|
|
|
|
# Initialize plugins
|
|
self.plugin_manager.init_plugins(self)
|
|
|
|
# Initialize performance monitoring
|
|
self.performance_monitor = PerformanceMonitor(config)
|
|
self.performance_optimizer = PerformanceOptimizer(config)
|
|
self.performance_reporter = PerformanceReporter(config)
|
|
|
|
# Parallel build support
|
|
self._build_lock = threading.Lock()
|
|
self._active_builds = {}
|
|
|
|
def _setup_caches(self) -> None:
|
|
"""Setup cache directories and ccache"""
|
|
try:
|
|
# Setup ccache if enabled
|
|
if self.config.use_ccache:
|
|
self.cache_manager.setup_ccache()
|
|
except Exception as e:
|
|
# Log warning but continue
|
|
print(f"Warning: Failed to setup caches: {e}")
|
|
|
|
def build(self, source_package: str, **kwargs) -> Dict[str, Any]:
|
|
"""Build a Debian source package in an isolated environment"""
|
|
|
|
# Create build profile for performance tracking
|
|
build_id = f"build_{int(time.time() * 1000)}"
|
|
profile_id = self.performance_monitor.create_build_profile(
|
|
build_id, source_package, self.config.architecture, self.config.suite
|
|
)
|
|
|
|
# Call pre-build hooks
|
|
self.plugin_manager.call_hooks(HookStages.PREBUILD, source_package, **kwargs)
|
|
|
|
# Ensure chroot exists
|
|
chroot_name = kwargs.get("chroot_name", self.config.chroot_name)
|
|
chroot_path = self.config.get_chroot_path()
|
|
|
|
if not self.chroot_manager.chroot_exists(chroot_name):
|
|
with self.performance_monitor.monitor_operation("chroot_creation") as op_id:
|
|
self.chroot_manager.create_chroot(chroot_name)
|
|
# Add chroot creation metrics to profile
|
|
self.performance_monitor.add_phase_metrics(profile_id, "chroot_creation",
|
|
self.performance_monitor._active_operations[op_id])
|
|
|
|
# Try to restore from cache first
|
|
if not self.chroot_manager.chroot_exists(chroot_name):
|
|
if not self.cache_manager.restore_root_cache(chroot_path):
|
|
self.chroot_manager.create_chroot(chroot_name)
|
|
|
|
# Setup build environment
|
|
with self.performance_monitor.monitor_operation("build_env_setup") as op_id:
|
|
build_env = self.config.setup_build_environment()
|
|
# Add build environment setup metrics to profile
|
|
self.performance_monitor.add_phase_metrics(profile_id, "build_env_setup",
|
|
self.performance_monitor._active_operations[op_id])
|
|
|
|
# Call build start hook
|
|
self.plugin_manager.call_hooks(HookStages.BUILD_START, source_package, chroot_name, **kwargs)
|
|
|
|
# Build the package
|
|
with self.performance_monitor.monitor_operation("package_build") as op_id:
|
|
build_result = self.sbuild_wrapper.build_package(source_package, chroot_name, build_env=build_env, **kwargs)
|
|
# Add package build metrics to profile
|
|
self.performance_monitor.add_phase_metrics(profile_id, "package_build",
|
|
self.performance_monitor._active_operations[op_id])
|
|
|
|
# Call build end hook
|
|
self.plugin_manager.call_hooks(HookStages.BUILD_END, build_result, source_package, chroot_name, **kwargs)
|
|
|
|
# Create cache after successful build
|
|
if build_result.get("success", False):
|
|
with self.performance_monitor.monitor_operation("cache_creation") as op_id:
|
|
self.cache_manager.create_root_cache(chroot_path)
|
|
# Add cache creation metrics to profile
|
|
self.performance_monitor.add_phase_metrics(profile_id, "cache_creation",
|
|
self.performance_monitor._active_operations[op_id])
|
|
|
|
# Capture and store metadata
|
|
with self.performance_monitor.monitor_operation("metadata_capture") as op_id:
|
|
metadata = self._capture_build_metadata(build_result, source_package)
|
|
self.metadata_manager.store_metadata(metadata)
|
|
# Add metadata capture metrics to profile
|
|
self.performance_monitor.add_phase_metrics(profile_id, "metadata_capture",
|
|
self.performance_monitor._active_operations[op_id])
|
|
|
|
# Clean up chroot if not keeping it
|
|
if not kwargs.get("keep_chroot", self.config.keep_chroot):
|
|
with self.performance_monitor.monitor_operation("chroot_cleanup") as op_id:
|
|
self.chroot_manager.clean_chroot(chroot_name)
|
|
# Add chroot cleanup metrics to profile
|
|
self.performance_monitor.add_phase_metrics(profile_id, "chroot_cleanup",
|
|
self.performance_monitor._active_operations[op_id])
|
|
|
|
# Call post-build hooks
|
|
self.plugin_manager.call_hooks(HookStages.POSTBUILD, build_result, source_package, **kwargs)
|
|
|
|
# Finalize build profile and generate optimization suggestions
|
|
build_profile = self.performance_monitor.finalize_build_profile(profile_id)
|
|
if build_profile and self.config.performance_auto_optimization:
|
|
analysis = self.performance_optimizer.analyze_build_performance(build_profile)
|
|
if analysis['automatic_tunings']:
|
|
self.performance_optimizer.apply_automatic_tunings(analysis['automatic_tunings'])
|
|
|
|
return build_result
|
|
|
|
def build_parallel(self, source_packages: List[str], max_workers: int = None, **kwargs) -> List[Dict[str, Any]]:
|
|
"""
|
|
Build multiple packages in parallel using multiple chroots
|
|
|
|
Args:
|
|
source_packages: List of source packages to build
|
|
max_workers: Maximum number of parallel builds (default: config.parallel_builds)
|
|
**kwargs: Additional build options
|
|
|
|
Returns:
|
|
List of build results in the same order as source_packages
|
|
"""
|
|
if max_workers is None:
|
|
max_workers = getattr(self.config, 'parallel_builds', 2)
|
|
|
|
# Limit max_workers to available system resources
|
|
max_workers = min(max_workers, os.cpu_count() or 2)
|
|
|
|
print(f"Building {len(source_packages)} packages with {max_workers} parallel workers")
|
|
|
|
# Create unique chroot names for parallel builds
|
|
chroot_names = [f"{self.config.chroot_name}-parallel-{i}" for i in range(len(source_packages))]
|
|
|
|
# Prepare build tasks
|
|
build_tasks = []
|
|
for i, (source_package, chroot_name) in enumerate(zip(source_packages, chroot_names)):
|
|
task_kwargs = kwargs.copy()
|
|
task_kwargs['chroot_name'] = chroot_name
|
|
task_kwargs['package_index'] = i
|
|
build_tasks.append((source_package, task_kwargs))
|
|
|
|
# Execute builds in parallel
|
|
results = [None] * len(source_packages)
|
|
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
|
|
# Submit all build tasks
|
|
future_to_index = {
|
|
executor.submit(self._build_single_parallel, source_pkg, **task_kwargs): i
|
|
for i, (source_pkg, task_kwargs) in enumerate(build_tasks)
|
|
}
|
|
|
|
# Collect results as they complete
|
|
for future in concurrent.futures.as_completed(future_to_index):
|
|
index = future_to_index[future]
|
|
try:
|
|
result = future.result()
|
|
results[index] = result
|
|
print(f"✅ Package {index + 1}/{len(source_packages)} completed: {result.get('package_name', 'unknown')}")
|
|
except Exception as e:
|
|
results[index] = {
|
|
'success': False,
|
|
'error': str(e),
|
|
'package_name': source_packages[index] if index < len(source_packages) else 'unknown'
|
|
}
|
|
print(f"❌ Package {index + 1}/{len(source_packages)} failed: {e}")
|
|
|
|
# Clean up parallel chroots
|
|
for chroot_name in chroot_names:
|
|
try:
|
|
self.chroot_manager.clean_chroot(chroot_name)
|
|
except Exception as e:
|
|
print(f"Warning: Failed to clean chroot {chroot_name}: {e}")
|
|
|
|
return results
|
|
|
|
def _build_single_parallel(self, source_package: str, **kwargs) -> Dict[str, Any]:
|
|
"""Build a single package for parallel execution"""
|
|
chroot_name = kwargs.get("chroot_name", self.config.chroot_name)
|
|
package_index = kwargs.get("package_index", 0)
|
|
|
|
print(f"🔄 Starting parallel build {package_index + 1}: {source_package}")
|
|
|
|
try:
|
|
# Ensure chroot exists for this parallel build
|
|
chroot_path = os.path.join(self.config.chroot_dir, chroot_name)
|
|
|
|
if not self.chroot_manager.chroot_exists(chroot_name):
|
|
if not self.cache_manager.restore_root_cache(chroot_path):
|
|
self.chroot_manager.create_chroot(chroot_name)
|
|
|
|
# Check build dependencies
|
|
deps_check = self.sbuild_wrapper.check_dependencies(source_package, chroot_name)
|
|
if not deps_check["satisfied"]:
|
|
if deps_check["missing"]:
|
|
self.sbuild_wrapper.install_build_dependencies(deps_check["missing"], chroot_name)
|
|
|
|
# Setup build environment
|
|
build_env = self.config.setup_build_environment()
|
|
|
|
# Build the package
|
|
build_result = self.sbuild_wrapper.build_package(
|
|
source_package, chroot_name, build_env=build_env, **kwargs
|
|
)
|
|
|
|
# Create cache after successful build
|
|
if build_result.get("success", False):
|
|
self.cache_manager.create_root_cache(chroot_path)
|
|
|
|
# Capture and store metadata
|
|
metadata = self._capture_build_metadata(build_result, source_package)
|
|
self.metadata_manager.store_metadata(metadata)
|
|
|
|
return build_result
|
|
|
|
except Exception as e:
|
|
return {
|
|
'success': False,
|
|
'error': str(e),
|
|
'package_name': source_package,
|
|
'chroot_name': chroot_name
|
|
}
|
|
|
|
def build_chain(self, source_packages: List[str], **kwargs) -> List[Dict[str, Any]]:
|
|
"""Build a chain of packages that depend on each other (similar to Mock's --chain)"""
|
|
|
|
results = []
|
|
chroot_name = kwargs.get("chroot_name", self.config.chroot_name)
|
|
chroot_path = self.config.get_chroot_path()
|
|
|
|
# Try to restore from cache first
|
|
if not self.chroot_manager.chroot_exists(chroot_name):
|
|
if not self.cache_manager.restore_root_cache(chroot_path):
|
|
self.chroot_manager.create_chroot(chroot_name)
|
|
|
|
# Setup build environment
|
|
build_env = self.config.setup_build_environment()
|
|
|
|
for i, source_package in enumerate(source_packages):
|
|
try:
|
|
# Build the package
|
|
result = self.sbuild_wrapper.build_package(source_package, chroot_name, build_env=build_env, **kwargs)
|
|
|
|
# Store result
|
|
results.append(result)
|
|
|
|
# If build failed, stop the chain
|
|
if not result.get("success", False):
|
|
print(f"Chain build failed at package {i+1}: {source_package}")
|
|
break
|
|
|
|
# Install the built package for dependency resolution
|
|
if result.get("success", False) and kwargs.get("install_built", True):
|
|
self._install_built_package(result, chroot_name)
|
|
|
|
except Exception as e:
|
|
error_result = {
|
|
"success": False,
|
|
"error": str(e),
|
|
"package": source_package,
|
|
"chain_position": i
|
|
}
|
|
results.append(error_result)
|
|
break
|
|
|
|
return results
|
|
|
|
def _install_built_package(self, build_result: Dict[str, Any], chroot_name: str) -> None:
|
|
"""Install a built package in the chroot for dependency resolution"""
|
|
try:
|
|
# Extract .deb files from build result
|
|
deb_files = build_result.get("artifacts", {}).get("deb_files", [])
|
|
|
|
for deb_file in deb_files:
|
|
if deb_file.endswith(".deb"):
|
|
# Copy .deb to chroot and install
|
|
self.chroot_manager.copy_in(deb_file, chroot_name, "/tmp/")
|
|
|
|
# Install the package
|
|
install_cmd = ["dpkg", "-i", f"/tmp/{os.path.basename(deb_file)}"]
|
|
self.chroot_manager.execute_in_chroot(chroot_name, install_cmd)
|
|
|
|
# Fix any broken dependencies
|
|
fix_cmd = ["apt-get", "install", "-f", "-y"]
|
|
self.chroot_manager.execute_in_chroot(chroot_name, fix_cmd)
|
|
|
|
except Exception as e:
|
|
print(f"Warning: Failed to install built package: {e}")
|
|
|
|
def init_chroot(self, chroot_name: str, arch: str = None, suite: str = None) -> None:
|
|
"""Initialize a new chroot environment"""
|
|
self.chroot_manager.create_chroot(chroot_name, arch, suite)
|
|
|
|
# Create cache after successful chroot creation
|
|
chroot_path = os.path.join(self.config.chroot_dir, chroot_name)
|
|
self.cache_manager.create_root_cache(chroot_path)
|
|
|
|
def clean_chroot(self, chroot_name: str) -> None:
|
|
"""Clean up a chroot environment"""
|
|
self.chroot_manager.clean_chroot(chroot_name)
|
|
|
|
def list_chroots(self) -> list:
|
|
"""List available chroot environments"""
|
|
return self.chroot_manager.list_chroots()
|
|
|
|
def update_chroot(self, chroot_name: str) -> None:
|
|
"""Update packages in a chroot environment"""
|
|
self.chroot_manager.update_chroot(chroot_name)
|
|
|
|
# Update cache after successful update
|
|
chroot_path = os.path.join(self.config.chroot_dir, chroot_name)
|
|
self.cache_manager.create_root_cache(chroot_path)
|
|
|
|
def get_chroot_info(self, chroot_name: str) -> dict:
|
|
"""Get information about a chroot environment"""
|
|
return self.chroot_manager.get_chroot_info(chroot_name)
|
|
|
|
def shell(self, chroot_name: str = None) -> None:
|
|
"""Open a shell in the chroot environment (similar to Mock's --shell)"""
|
|
if chroot_name is None:
|
|
chroot_name = self.config.chroot_name
|
|
|
|
if not self.chroot_manager.chroot_exists(chroot_name):
|
|
raise ChrootError(f"Chroot '{chroot_name}' does not exist")
|
|
|
|
# Execute shell in chroot
|
|
self.chroot_manager.execute_in_chroot(chroot_name, ["/bin/bash"], capture_output=False)
|
|
|
|
def copyout(self, source_path: str, dest_path: str, chroot_name: str = None) -> None:
|
|
"""Copy files from chroot to host (similar to Mock's --copyout)"""
|
|
if chroot_name is None:
|
|
chroot_name = self.config.chroot_name
|
|
|
|
self.chroot_manager.copy_from_chroot(source_path, dest_path, chroot_name)
|
|
|
|
def copyin(self, source_path: str, dest_path: str, chroot_name: str = None) -> None:
|
|
"""Copy files from host to chroot (similar to Mock's --copyin)"""
|
|
if chroot_name is None:
|
|
chroot_name = self.config.chroot_name
|
|
|
|
self.chroot_manager.copy_to_chroot(source_path, dest_path, chroot_name)
|
|
|
|
def cleanup_caches(self) -> Dict[str, int]:
|
|
"""Clean up old cache files (similar to Mock's cache management)"""
|
|
return self.cache_manager.cleanup_old_caches()
|
|
|
|
def get_cache_stats(self) -> Dict[str, Any]:
|
|
"""Get cache statistics"""
|
|
return self.cache_manager.get_cache_stats()
|
|
|
|
def _capture_build_metadata(self, build_result: Dict[str, Any], source_package: str) -> Dict[str, Any]:
|
|
"""Capture comprehensive build metadata"""
|
|
|
|
metadata = {
|
|
"source_package": source_package,
|
|
"build_result": build_result,
|
|
"config": self.config.to_dict(),
|
|
"artifacts": build_result.get("artifacts", []),
|
|
"build_metadata": build_result.get("metadata", {}),
|
|
"timestamp": self._get_timestamp(),
|
|
"build_success": build_result.get("success", False),
|
|
"cache_info": self.get_cache_stats(),
|
|
}
|
|
|
|
# Add artifact details
|
|
metadata["artifact_details"] = self._get_artifact_details(build_result.get("artifacts", []))
|
|
|
|
return metadata
|
|
|
|
def _get_timestamp(self) -> str:
|
|
"""Get current timestamp"""
|
|
from datetime import datetime
|
|
|
|
return datetime.now().isoformat()
|
|
|
|
def _get_artifact_details(self, artifacts: list) -> list:
|
|
"""Get detailed information about build artifacts"""
|
|
details = []
|
|
|
|
for artifact_path in artifacts:
|
|
if os.path.exists(artifact_path):
|
|
stat = os.stat(artifact_path)
|
|
details.append(
|
|
{
|
|
"path": artifact_path,
|
|
"name": os.path.basename(artifact_path),
|
|
"size": stat.st_size,
|
|
"modified": stat.st_mtime,
|
|
"type": self._get_artifact_type(artifact_path),
|
|
}
|
|
)
|
|
|
|
return details
|
|
|
|
def _get_artifact_type(self, artifact_path: str) -> str:
|
|
"""Determine the type of build artifact"""
|
|
ext = Path(artifact_path).suffix.lower()
|
|
|
|
if ext == ".deb":
|
|
return "deb_package"
|
|
elif ext == ".changes":
|
|
return "changes_file"
|
|
elif ext == ".buildinfo":
|
|
return "buildinfo_file"
|
|
elif ext == ".dsc":
|
|
return "source_package"
|
|
else:
|
|
return "other"
|
|
|
|
def verify_reproducible_build(self, source_package: str, **kwargs) -> Dict[str, Any]:
|
|
"""Verify that a build is reproducible by building twice and comparing results"""
|
|
|
|
# First build
|
|
result1 = self.build(source_package, **kwargs)
|
|
|
|
# Clean chroot for second build
|
|
chroot_name = kwargs.get("chroot_name", self.config.chroot_name)
|
|
if self.chroot_manager.chroot_exists(chroot_name):
|
|
self.chroot_manager.clean_chroot(chroot_name)
|
|
|
|
# Second build
|
|
result2 = self.build(source_package, **kwargs)
|
|
|
|
# Compare results
|
|
comparison = self._compare_build_results(result1, result2)
|
|
|
|
return {
|
|
"reproducible": comparison["identical"],
|
|
"first_build": result1,
|
|
"second_build": result2,
|
|
"comparison": comparison,
|
|
}
|
|
|
|
def _compare_build_results(self, result1: Dict[str, Any], result2: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Compare two build results for reproducibility"""
|
|
|
|
comparison = {"identical": True, "differences": [], "artifact_comparison": {}}
|
|
|
|
# Compare artifacts
|
|
artifacts1 = set(result1.get("artifacts", []))
|
|
artifacts2 = set(result2.get("artifacts", []))
|
|
|
|
if artifacts1 != artifacts2:
|
|
comparison["identical"] = False
|
|
comparison["differences"].append("Different artifacts produced")
|
|
|
|
# Compare individual artifacts
|
|
common_artifacts = artifacts1.intersection(artifacts2)
|
|
for artifact in common_artifacts:
|
|
if os.path.exists(artifact):
|
|
# Compare file hashes
|
|
hash1 = self._get_file_hash(artifact)
|
|
hash2 = self._get_file_hash(artifact)
|
|
|
|
comparison["artifact_comparison"][artifact] = {
|
|
"identical": hash1 == hash2,
|
|
"hash1": hash1,
|
|
"hash2": hash2,
|
|
}
|
|
|
|
if hash1 != hash2:
|
|
comparison["identical"] = False
|
|
comparison["differences"].append(f"Artifact {artifact} differs")
|
|
|
|
return comparison
|
|
|
|
def _get_file_hash(self, file_path: str) -> str:
|
|
"""Get SHA256 hash of a file"""
|
|
import hashlib
|
|
|
|
hash_sha256 = hashlib.sha256()
|
|
with open(file_path, "rb") as f:
|
|
for chunk in iter(lambda: f.read(4096), b""):
|
|
hash_sha256.update(chunk)
|
|
|
|
return hash_sha256.hexdigest()
|
|
|
|
def get_build_history(self) -> list:
|
|
"""Get build history from metadata store"""
|
|
return self.metadata_manager.get_build_history()
|
|
|
|
def get_build_info(self, build_id: str) -> Optional[Dict[str, Any]]:
|
|
"""Get information about a specific build"""
|
|
return self.metadata_manager.get_build_info(build_id)
|
|
|
|
def install_dependencies(self, source_package: str) -> Dict[str, Any]:
|
|
"""Install build dependencies for a source package"""
|
|
chroot_name = self.config.chroot_name
|
|
|
|
# Ensure chroot exists
|
|
if not self.chroot_manager.chroot_exists(chroot_name):
|
|
self.chroot_manager.create_chroot(chroot_name)
|
|
|
|
# Check and install dependencies
|
|
deps_check = self.sbuild_wrapper.check_dependencies(source_package, chroot_name)
|
|
if deps_check["missing"]:
|
|
result = self.sbuild_wrapper.install_build_dependencies(deps_check["missing"], chroot_name)
|
|
return {
|
|
"success": True,
|
|
"installed": deps_check["missing"],
|
|
"details": result,
|
|
}
|
|
else:
|
|
return {
|
|
"success": True,
|
|
"installed": [],
|
|
"message": "All dependencies already satisfied",
|
|
}
|
|
|
|
def install_packages(self, packages: List[str]) -> Dict[str, Any]:
|
|
"""Install packages in the chroot environment"""
|
|
chroot_name = self.config.chroot_name
|
|
|
|
# Ensure chroot exists
|
|
if not self.chroot_manager.chroot_exists(chroot_name):
|
|
self.chroot_manager.create_chroot(chroot_name)
|
|
|
|
# Install packages using APT
|
|
result = self.chroot_manager.execute_in_chroot(
|
|
chroot_name,
|
|
f"{self.config.apt_install_command} {' '.join(packages)}",
|
|
|
|
)
|
|
|
|
return {
|
|
"success": result.returncode == 0,
|
|
"installed": packages,
|
|
"output": result.stdout,
|
|
"error": result.stderr if result.returncode != 0 else None,
|
|
}
|
|
|
|
def update_packages(self, packages: List[str] = None) -> Dict[str, Any]:
|
|
"""Update packages in the chroot environment"""
|
|
chroot_name = self.config.chroot_name
|
|
|
|
# Ensure chroot exists
|
|
if not self.chroot_manager.chroot_exists(chroot_name):
|
|
self.chroot_manager.create_chroot(chroot_name)
|
|
|
|
if packages:
|
|
# Update specific packages
|
|
cmd = f"{self.config.apt_command} install --only-upgrade {' '.join(packages)}"
|
|
else:
|
|
# Update all packages
|
|
cmd = f"{self.config.apt_command} update && {self.config.apt_command} upgrade -y"
|
|
|
|
result = self.chroot_manager.execute_in_chroot(chroot_name, cmd)
|
|
|
|
return {
|
|
"success": result.returncode == 0,
|
|
"updated": packages if packages else "all",
|
|
"output": result.stdout,
|
|
"error": result.stderr if result.returncode != 0 else None,
|
|
}
|
|
|
|
def remove_packages(self, packages: List[str]) -> Dict[str, Any]:
|
|
"""Remove packages from the chroot environment"""
|
|
chroot_name = self.config.chroot_name
|
|
|
|
# Ensure chroot exists
|
|
if not self.chroot_manager.chroot_exists(chroot_name):
|
|
self.chroot_manager.create_chroot(chroot_name)
|
|
|
|
# Remove packages using APT
|
|
cmd = f"{self.config.apt_command} remove -y {' '.join(packages)}"
|
|
result = self.chroot_manager.execute_in_chroot(chroot_name, cmd)
|
|
|
|
return {
|
|
"success": result.returncode == 0,
|
|
"removed": packages,
|
|
"output": result.stdout,
|
|
"error": result.stderr if result.returncode != 0 else None,
|
|
}
|
|
|
|
def execute_apt_command(self, command: str) -> Dict[str, Any]:
|
|
"""Execute APT command in the chroot environment"""
|
|
chroot_name = self.config.chroot_name
|
|
|
|
# Ensure chroot exists
|
|
if not self.chroot_manager.chroot_exists(chroot_name):
|
|
self.chroot_manager.create_chroot(chroot_name)
|
|
|
|
# Execute APT command
|
|
cmd = f"{self.config.apt_command} {command}"
|
|
result = self.chroot_manager.execute_in_chroot(chroot_name, cmd)
|
|
|
|
return {
|
|
"success": result.returncode == 0,
|
|
"command": command,
|
|
"output": result.stdout,
|
|
"error": result.stderr if result.returncode != 0 else None,
|
|
}
|