Add comprehensive testing framework, performance monitoring, and plugin system
Some checks failed
Build Deb-Mock Package / build (push) Failing after 1m9s
Lint Code / Lint All Code (push) Failing after 1s
Test Deb-Mock Build / test (push) Failing after 35s

- Add complete pytest testing framework with conftest.py and test files
- Add performance monitoring and benchmarking capabilities
- Add plugin system with ccache plugin example
- Add comprehensive documentation (API, deployment, testing, etc.)
- Add Docker API wrapper for service deployment
- Add advanced configuration examples
- Remove old wget package file
- Update core modules with enhanced functionality
This commit is contained in:
robojerk 2025-08-19 20:49:32 -07:00
parent 4c0dcb2522
commit c51819c836
30 changed files with 11141 additions and 105 deletions

778
deb_mock/benchmarking.py Normal file
View file

@ -0,0 +1,778 @@
"""
Advanced benchmarking system for deb-mock
"""
import time
import psutil
import threading
import json
import os
import statistics
import subprocess
from pathlib import Path
from typing import Dict, List, Any, Optional, Callable, Tuple
from contextlib import contextmanager
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
import multiprocessing
from .exceptions import PerformanceError
@dataclass
class BenchmarkConfig:
"""Configuration for benchmarking"""
name: str
description: str
iterations: int
warmup_iterations: int
parallel_runs: int
timeout_seconds: int
collect_system_metrics: bool
collect_detailed_metrics: bool
output_format: str # json, html, csv
output_file: Optional[str]
@dataclass
class BenchmarkMetrics:
"""Metrics collected during benchmarking"""
timestamp: datetime
duration: float
cpu_percent: float
memory_mb: float
disk_io_read_mb: float
disk_io_write_mb: float
network_io_mb: float
chroot_size_mb: float
cache_hit_rate: float
parallel_efficiency: float
resource_utilization: float
# System-level metrics
system_cpu_percent: float
system_memory_percent: float
system_load_average: Tuple[float, float, float]
system_disk_usage_percent: float
system_network_connections: int
@dataclass
class BenchmarkResult:
"""Result of a benchmark run"""
benchmark_name: str
config: BenchmarkConfig
start_time: datetime
end_time: datetime
total_duration: float
iterations: int
successful_iterations: int
failed_iterations: int
# Performance statistics
durations: List[float]
average_duration: float
min_duration: float
max_duration: float
median_duration: float
standard_deviation: float
coefficient_of_variation: float
# Percentiles
percentiles: Dict[str, float]
# System impact
system_impact: Dict[str, float]
# Detailed metrics
metrics: List[BenchmarkMetrics]
# Analysis
analysis: Dict[str, Any]
recommendations: List[str]
# Metadata
system_info: Dict[str, Any]
benchmark_version: str
class BenchmarkRunner:
"""Advanced benchmark runner for deb-mock operations"""
def __init__(self, config):
self.config = config
self.logger = logging.getLogger(__name__)
# Benchmark history
self._benchmark_history = []
self._benchmark_results = {}
# System information
self._system_info = self._collect_system_info()
# Benchmark templates
self._benchmark_templates = self._load_benchmark_templates()
# Performance baselines
self._performance_baselines = {}
self._load_performance_baselines()
def _collect_system_info(self) -> Dict[str, Any]:
"""Collect comprehensive system information"""
try:
# CPU information
cpu_info = {
"count": psutil.cpu_count(),
"count_logical": psutil.cpu_count(logical=True),
"freq": psutil.cpu_freq()._asdict() if psutil.cpu_freq() else None,
"architecture": os.uname().machine if hasattr(os, 'uname') else "unknown"
}
# Memory information
memory = psutil.virtual_memory()
memory_info = {
"total_gb": memory.total / (1024**3),
"available_gb": memory.available / (1024**3),
"percent": memory.percent
}
# Disk information
disk = psutil.disk_usage('/')
disk_info = {
"total_gb": disk.total / (1024**3),
"free_gb": disk.free / (1024**3),
"percent": disk.percent
}
# OS information
os_info = {
"platform": os.uname().sysname if hasattr(os, 'uname') else "unknown",
"release": os.uname().release if hasattr(os, 'uname') else "unknown",
"version": os.uname().version if hasattr(os, 'uname') else "unknown"
}
# Python information
python_info = {
"version": f"{os.sys.version_info.major}.{os.sys.version_info.minor}.{os.sys.version_info.micro}",
"implementation": os.sys.implementation.name,
"platform": os.sys.platform
}
return {
"cpu": cpu_info,
"memory": memory_info,
"disk": disk_info,
"os": os_info,
"python": python_info,
"timestamp": datetime.now().isoformat()
}
except Exception as e:
self.logger.error(f"Failed to collect system info: {e}")
return {"error": str(e)}
def _load_benchmark_templates(self) -> Dict[str, BenchmarkConfig]:
"""Load predefined benchmark templates"""
templates = {
"quick": BenchmarkConfig(
name="Quick Benchmark",
description="Fast benchmark with minimal iterations",
iterations=5,
warmup_iterations=1,
parallel_runs=1,
timeout_seconds=300,
collect_system_metrics=True,
collect_detailed_metrics=False,
output_format="json",
output_file=None
),
"standard": BenchmarkConfig(
name="Standard Benchmark",
description="Standard benchmark with moderate iterations",
iterations=20,
warmup_iterations=3,
parallel_runs=2,
timeout_seconds=600,
collect_system_metrics=True,
collect_detailed_metrics=True,
output_format="html",
output_file=None
),
"comprehensive": BenchmarkConfig(
name="Comprehensive Benchmark",
description="Comprehensive benchmark with many iterations",
iterations=100,
warmup_iterations=10,
parallel_runs=4,
timeout_seconds=1800,
collect_system_metrics=True,
collect_detailed_metrics=True,
output_format="html",
output_file=None
),
"stress": BenchmarkConfig(
name="Stress Test",
description="Stress test with high load",
iterations=50,
warmup_iterations=5,
parallel_runs=8,
timeout_seconds=1200,
collect_system_metrics=True,
collect_detailed_metrics=True,
output_format="json",
output_file=None
)
}
return templates
def _load_performance_baselines(self):
"""Load performance baselines for comparison"""
baseline_file = os.path.join(getattr(self.config, 'performance_metrics_dir', './performance-metrics'), "baselines.json")
if os.path.exists(baseline_file):
try:
with open(baseline_file, 'r') as f:
self._performance_baselines = json.load(f)
self.logger.info("Loaded performance baselines for benchmarking")
except Exception as e:
self.logger.warning(f"Failed to load baselines: {e}")
def run_benchmark(self, benchmark_name: str, operation_func: Callable,
operation_args: Tuple = (), operation_kwargs: Dict = None,
config: Optional[BenchmarkConfig] = None) -> BenchmarkResult:
"""Run a benchmark for a specific operation"""
if operation_kwargs is None:
operation_kwargs = {}
# Use template if no config provided
if config is None:
if benchmark_name in self._benchmark_templates:
config = self._benchmark_templates[benchmark_name]
else:
config = self._benchmark_templates["standard"]
self.logger.info(f"Starting benchmark: {benchmark_name}")
self.logger.info(f"Configuration: {iterations} iterations, {parallel_runs} parallel runs")
start_time = datetime.now()
results = []
metrics_list = []
# Warmup runs
if config.warmup_iterations > 0:
self.logger.info(f"Running {config.warmup_iterations} warmup iterations")
for i in range(config.warmup_iterations):
try:
operation_func(*operation_args, **operation_kwargs)
except Exception as e:
self.logger.warning(f"Warmup iteration {i+1} failed: {e}")
# Main benchmark runs
self.logger.info(f"Running {config.iterations} benchmark iterations")
if config.parallel_runs > 1:
results = self._run_parallel_benchmark(operation_func, operation_args, operation_kwargs, config)
else:
results = self._run_sequential_benchmark(operation_func, operation_args, operation_kwargs, config)
# Collect system metrics if enabled
if config.collect_system_metrics:
metrics_list = self._collect_benchmark_metrics(results, config)
# Calculate statistics
durations = [r["duration"] for r in results if r["success"]]
successful_iterations = len(durations)
failed_iterations = len(results) - successful_iterations
if not durations:
raise PerformanceError("No successful benchmark iterations")
# Calculate performance statistics
stats = self._calculate_performance_statistics(durations)
# Calculate system impact
system_impact = self._calculate_system_impact(metrics_list) if metrics_list else {}
# Generate analysis and recommendations
analysis = self._analyze_benchmark_results(stats, system_impact)
recommendations = self._generate_benchmark_recommendations(analysis, stats)
# Create benchmark result
end_time = datetime.now()
total_duration = (end_time - start_time).total_seconds()
benchmark_result = BenchmarkResult(
benchmark_name=benchmark_name,
config=config,
start_time=start_time,
end_time=end_time,
total_duration=total_duration,
iterations=config.iterations,
successful_iterations=successful_iterations,
failed_iterations=failed_iterations,
durations=durations,
average_duration=stats["average"],
min_duration=stats["min"],
max_duration=stats["max"],
median_duration=stats["median"],
standard_deviation=stats["std_dev"],
coefficient_of_variation=stats["cv"],
percentiles=stats["percentiles"],
system_impact=system_impact,
metrics=metrics_list,
analysis=analysis,
recommendations=recommendations,
system_info=self._system_info,
benchmark_version="1.0.0"
)
# Store result
self._benchmark_results[benchmark_name] = benchmark_result
self._benchmark_history.append(benchmark_result)
# Save result
self._save_benchmark_result(benchmark_result)
self.logger.info(f"Benchmark completed: {benchmark_name}")
self.logger.info(f"Results: {successful_iterations}/{config.iterations} successful, "
f"avg duration: {stats['average']:.3f}s")
return benchmark_result
def _run_sequential_benchmark(self, operation_func: Callable, operation_args: Tuple,
operation_kwargs: Dict, config: BenchmarkConfig) -> List[Dict[str, Any]]:
"""Run benchmark iterations sequentially"""
results = []
for i in range(config.iterations):
self.logger.debug(f"Running iteration {i+1}/{config.iterations}")
try:
start_time = time.time()
result = operation_func(*operation_args, **operation_kwargs)
end_time = time.time()
iteration_result = {
"iteration": i + 1,
"success": True,
"duration": end_time - start_time,
"result": result,
"timestamp": datetime.now()
}
results.append(iteration_result)
except Exception as e:
self.logger.warning(f"Iteration {i+1} failed: {e}")
iteration_result = {
"iteration": i + 1,
"success": False,
"duration": 0,
"error": str(e),
"timestamp": datetime.now()
}
results.append(iteration_result)
return results
def _run_parallel_benchmark(self, operation_func: Callable, operation_args: Tuple,
operation_kwargs: Dict, config: BenchmarkConfig) -> List[Dict[str, Any]]:
"""Run benchmark iterations in parallel"""
results = []
def run_iteration(iteration_num):
try:
start_time = time.time()
result = operation_func(*operation_args, **operation_kwargs)
end_time = time.time()
return {
"iteration": iteration_num,
"success": True,
"duration": end_time - start_time,
"result": result,
"timestamp": datetime.now()
}
except Exception as e:
self.logger.warning(f"Iteration {iteration_num} failed: {e}")
return {
"iteration": iteration_num,
"success": False,
"duration": 0,
"error": str(e),
"timestamp": datetime.now()
}
# Use ThreadPoolExecutor for parallel execution
with ThreadPoolExecutor(max_workers=config.parallel_runs) as executor:
future_to_iteration = {
executor.submit(run_iteration, i + 1): i + 1
for i in range(config.iterations)
}
for future in as_completed(future_to_iteration):
result = future.result()
results.append(result)
# Sort results by iteration number
results.sort(key=lambda x: x["iteration"])
return results
def _collect_benchmark_metrics(self, results: List[Dict[str, Any]],
config: BenchmarkConfig) -> List[BenchmarkMetrics]:
"""Collect system metrics during benchmarking"""
metrics_list = []
for result in results:
if not result["success"]:
continue
try:
# Collect system metrics
cpu_percent = psutil.cpu_percent(interval=0.1)
memory = psutil.virtual_memory()
disk_io = psutil.disk_io_counters()
net_io = psutil.net_io_counters()
# Get load average if available
try:
load_avg = os.getloadavg()
except (OSError, AttributeError):
load_avg = (0.0, 0.0, 0.0)
# Get disk usage
disk_usage = psutil.disk_usage('/')
# Get network connections count
try:
net_connections = len(psutil.net_connections())
except (OSError, psutil.AccessDenied):
net_connections = 0
metrics = BenchmarkMetrics(
timestamp=result["timestamp"],
duration=result["duration"],
cpu_percent=cpu_percent,
memory_mb=memory.used / (1024 * 1024),
disk_io_read_mb=disk_io.read_bytes / (1024 * 1024) if disk_io else 0,
disk_io_write_mb=disk_io.write_bytes / (1024 * 1024) if disk_io else 0,
network_io_mb=(net_io.bytes_sent + net_io.bytes_recv) / (1024 * 1024) if net_io else 0,
chroot_size_mb=0, # Would need to be calculated from actual chroot
cache_hit_rate=0.0, # Would need to be calculated from cache metrics
parallel_efficiency=1.0, # Would need to be calculated
resource_utilization=0.0, # Would need to be calculated
system_cpu_percent=cpu_percent,
system_memory_percent=memory.percent,
system_load_average=load_avg,
system_disk_usage_percent=disk_usage.percent,
system_network_connections=net_connections
)
metrics_list.append(metrics)
except Exception as e:
self.logger.warning(f"Failed to collect metrics for iteration {result['iteration']}: {e}")
return metrics_list
def _calculate_performance_statistics(self, durations: List[float]) -> Dict[str, Any]:
"""Calculate comprehensive performance statistics"""
if not durations:
return {}
# Basic statistics
avg_duration = statistics.mean(durations)
min_duration = min(durations)
max_duration = max(durations)
median_duration = statistics.median(durations)
# Standard deviation and coefficient of variation
try:
std_dev = statistics.stdev(durations)
cv = std_dev / avg_duration if avg_duration > 0 else 0
except statistics.StatisticsError:
std_dev = 0
cv = 0
# Percentiles
sorted_durations = sorted(durations)
percentiles = {
"p10": sorted_durations[int(0.1 * len(sorted_durations))],
"p25": sorted_durations[int(0.25 * len(sorted_durations))],
"p50": sorted_durations[int(0.5 * len(sorted_durations))],
"p75": sorted_durations[int(0.75 * len(sorted_durations))],
"p90": sorted_durations[int(0.9 * len(sorted_durations))],
"p95": sorted_durations[int(0.95 * len(sorted_durations))],
"p99": sorted_durations[int(0.99 * len(sorted_durations))]
}
return {
"average": avg_duration,
"min": min_duration,
"max": max_duration,
"median": median_duration,
"std_dev": std_dev,
"cv": cv,
"percentiles": percentiles
}
def _calculate_system_impact(self, metrics_list: List[BenchmarkMetrics]) -> Dict[str, float]:
"""Calculate system impact during benchmarking"""
if not metrics_list:
return {}
# Calculate averages across all metrics
avg_cpu = statistics.mean(m.cpu_percent for m in metrics_list)
avg_memory = statistics.mean(m.memory_mb for m in metrics_list)
avg_disk_read = statistics.mean(m.disk_io_read_mb for m in metrics_list)
avg_disk_write = statistics.mean(m.disk_io_write_mb for m in metrics_list)
avg_network = statistics.mean(m.network_io_mb for m in metrics_list)
# Calculate peak values
peak_cpu = max(m.cpu_percent for m in metrics_list)
peak_memory = max(m.memory_mb for m in metrics_list)
return {
"avg_cpu_percent": avg_cpu,
"avg_memory_mb": avg_memory,
"avg_disk_read_mb": avg_disk_read,
"avg_disk_write_mb": avg_disk_write,
"avg_network_mb": avg_network,
"peak_cpu_percent": peak_cpu,
"peak_memory_mb": peak_memory
}
def _analyze_benchmark_results(self, stats: Dict[str, Any],
system_impact: Dict[str, float]) -> Dict[str, Any]:
"""Analyze benchmark results for insights"""
analysis = {
"performance_stability": "unknown",
"system_impact_level": "unknown",
"optimization_opportunities": [],
"anomalies": []
}
# Analyze performance stability
cv = stats.get("cv", 0)
if cv < 0.1:
analysis["performance_stability"] = "excellent"
elif cv < 0.2:
analysis["performance_stability"] = "good"
elif cv < 0.3:
analysis["performance_stability"] = "fair"
else:
analysis["performance_stability"] = "poor"
analysis["optimization_opportunities"].append("High performance variability detected")
# Analyze system impact
avg_cpu = system_impact.get("avg_cpu_percent", 0)
avg_memory = system_impact.get("avg_memory_mb", 0)
if avg_cpu < 30:
analysis["system_impact_level"] = "low"
analysis["optimization_opportunities"].append("CPU utilization is low, consider increasing parallelization")
elif avg_cpu < 70:
analysis["system_impact_level"] = "moderate"
else:
analysis["system_impact_level"] = "high"
analysis["optimization_opportunities"].append("High CPU utilization, consider reducing load")
if avg_memory > 2048: # 2GB
analysis["optimization_opportunities"].append("High memory usage, consider optimizing memory allocation")
# Detect anomalies
durations = stats.get("durations", [])
if durations:
avg_duration = stats.get("average", 0)
for duration in durations:
if abs(duration - avg_duration) > 2 * stats.get("std_dev", 0):
analysis["anomalies"].append(f"Duration anomaly: {duration:.3f}s (avg: {avg_duration:.3f}s)")
return analysis
def _generate_benchmark_recommendations(self, analysis: Dict[str, Any],
stats: Dict[str, Any]) -> List[str]:
"""Generate actionable recommendations based on benchmark results"""
recommendations = []
# Performance stability recommendations
stability = analysis.get("performance_stability", "unknown")
if stability in ["fair", "poor"]:
recommendations.append("Investigate performance variability - check for external factors affecting performance")
recommendations.append("Consider running more iterations to get more stable results")
# System impact recommendations
impact_level = analysis.get("system_impact_level", "unknown")
if impact_level == "low":
recommendations.append("System resources are underutilized - consider increasing workload or parallelization")
elif impact_level == "high":
recommendations.append("System is under high load - consider reducing workload or optimizing operations")
# Optimization recommendations
for opportunity in analysis.get("optimization_opportunities", []):
recommendations.append(opportunity)
# General recommendations
if stats.get("cv", 0) > 0.2:
recommendations.append("High coefficient of variation suggests inconsistent performance - investigate root causes")
if len(recommendations) == 0:
recommendations.append("Performance is within acceptable parameters - continue monitoring")
return recommendations
def _save_benchmark_result(self, result: BenchmarkResult):
"""Save benchmark result to file"""
try:
metrics_dir = getattr(self.config, 'performance_metrics_dir', './performance-metrics')
os.makedirs(metrics_dir, exist_ok=True)
timestamp = result.start_time.strftime("%Y%m%d_%H%M%S")
filename = f"benchmark_{result.benchmark_name}_{timestamp}.json"
filepath = os.path.join(metrics_dir, filename)
# Convert to dict for JSON serialization
result_dict = asdict(result)
result_dict["start_time"] = result.start_time.isoformat()
result_dict["end_time"] = result.end_time.isoformat()
result_dict["timestamp"] = result.timestamp.isoformat()
with open(filepath, 'w') as f:
json.dump(result_dict, f, indent=2, default=str)
self.logger.info(f"Benchmark result saved: {filepath}")
except Exception as e:
self.logger.error(f"Failed to save benchmark result: {e}")
def compare_benchmarks(self, benchmark_names: List[str]) -> Dict[str, Any]:
"""Compare multiple benchmark results"""
if len(benchmark_names) < 2:
raise ValueError("Need at least 2 benchmark names for comparison")
comparison = {
"benchmarks": benchmark_names,
"comparison_date": datetime.now().isoformat(),
"results": {},
"analysis": {},
"recommendations": []
}
# Collect benchmark results
for name in benchmark_names:
if name in self._benchmark_results:
result = self._benchmark_results[name]
comparison["results"][name] = {
"average_duration": result.average_duration,
"min_duration": result.min_duration,
"max_duration": result.max_duration,
"standard_deviation": result.standard_deviation,
"coefficient_of_variation": result.coefficient_of_variation,
"successful_iterations": result.successful_iterations,
"total_iterations": result.iterations
}
# Perform comparison analysis
if len(comparison["results"]) >= 2:
comparison["analysis"] = self._analyze_benchmark_comparison(comparison["results"])
comparison["recommendations"] = self._generate_comparison_recommendations(comparison["analysis"])
return comparison
def _analyze_benchmark_comparison(self, results: Dict[str, Any]) -> Dict[str, Any]:
"""Analyze comparison between benchmark results"""
analysis = {
"fastest_benchmark": None,
"slowest_benchmark": None,
"most_stable_benchmark": None,
"least_stable_benchmark": None,
"performance_differences": {},
"stability_differences": {}
}
if len(results) < 2:
return analysis
# Find fastest and slowest
avg_durations = {name: data["average_duration"] for name, data in results.items()}
fastest = min(avg_durations, key=avg_durations.get)
slowest = max(avg_durations, key=avg_durations.get)
analysis["fastest_benchmark"] = fastest
analysis["slowest_benchmark"] = slowest
# Find most and least stable
cv_values = {name: data["coefficient_of_variation"] for name, data in results.items()}
most_stable = min(cv_values, key=cv_values.get)
least_stable = max(cv_values, key=cv_values.get)
analysis["most_stable_benchmark"] = most_stable
analysis["least_stable_benchmark"] = least_stable
# Calculate performance differences
fastest_avg = avg_durations[fastest]
for name, data in results.items():
if name != fastest:
diff_percent = ((data["average_duration"] - fastest_avg) / fastest_avg) * 100
analysis["performance_differences"][name] = {
"vs_fastest_percent": diff_percent,
"vs_fastest_seconds": data["average_duration"] - fastest_avg
}
# Calculate stability differences
most_stable_cv = cv_values[most_stable]
for name, data in results.items():
if name != most_stable:
cv_diff = data["coefficient_of_variation"] - most_stable_cv
analysis["stability_differences"][name] = {
"vs_most_stable_cv": cv_diff,
"stability_ratio": data["coefficient_of_variation"] / most_stable_cv
}
return analysis
def _generate_comparison_recommendations(self, analysis: Dict[str, Any]) -> List[str]:
"""Generate recommendations based on benchmark comparison"""
recommendations = []
fastest = analysis.get("fastest_benchmark")
slowest = analysis.get("slowest_benchmark")
most_stable = analysis.get("most_stable_benchmark")
least_stable = analysis.get("least_stable_benchmark")
if fastest and slowest and fastest != slowest:
fastest_avg = analysis["performance_differences"][slowest]["vs_fastest_percent"]
recommendations.append(f"Benchmark '{slowest}' is {fastest_avg:.1f}% slower than '{fastest}' - investigate performance differences")
if most_stable and least_stable and most_stable != least_stable:
stability_ratio = analysis["stability_differences"][least_stable]["stability_ratio"]
recommendations.append(f"Benchmark '{least_stable}' is {stability_ratio:.2f}x less stable than '{most_stable}' - investigate variability causes")
# General recommendations
if len(analysis.get("performance_differences", {})) > 0:
recommendations.append("Consider using the fastest benchmark configuration for production")
if len(analysis.get("stability_differences", {})) > 0:
recommendations.append("Consider using the most stable benchmark configuration for critical operations")
return recommendations
def list_benchmarks(self) -> List[str]:
"""List all available benchmark templates"""
return list(self._benchmark_templates.keys())
def get_benchmark_result(self, benchmark_name: str) -> Optional[BenchmarkResult]:
"""Get a specific benchmark result"""
return self._benchmark_results.get(benchmark_name)
def get_benchmark_history(self) -> List[BenchmarkResult]:
"""Get all benchmark results"""
return self._benchmark_history.copy()
def clear_benchmark_history(self):
"""Clear benchmark history"""
self._benchmark_history.clear()
self._benchmark_results.clear()
self.logger.info("Benchmark history cleared")

View file

@ -5,10 +5,12 @@ Chroot management for deb-mock
import os
import shutil
import subprocess
import tempfile
from pathlib import Path
from typing import List
from typing import List, Dict, Optional
from .exceptions import ChrootError
from .uid_manager import UIDManager
class ChrootManager:
@ -16,6 +18,8 @@ class ChrootManager:
def __init__(self, config):
self.config = config
self._active_mounts = {} # Track active mounts per chroot
self.uid_manager = UIDManager(config)
def create_chroot(self, chroot_name: str, arch: str = None, suite: str = None) -> None:
"""Create a new chroot environment"""
@ -30,6 +34,12 @@ class ChrootManager:
self._create_bootstrap_chroot(chroot_name)
else:
self._create_standard_chroot(chroot_name)
# Setup advanced mounts after chroot creation
self._setup_advanced_mounts(chroot_name)
# Setup UID/GID management
self._setup_chroot_users(chroot_name)
def _create_bootstrap_chroot(self, chroot_name: str) -> None:
"""
@ -49,7 +59,7 @@ class ChrootManager:
try:
# Create final chroot using debootstrap from within bootstrap
cmd = [
"debootstrap",
"/usr/sbin/debootstrap",
"--arch",
self.config.architecture,
self.config.suite,
@ -94,7 +104,7 @@ class ChrootManager:
# Run debootstrap
cmd = [
"debootstrap",
"/usr/sbin/debootstrap",
"--arch",
self.config.architecture,
self.config.suite,
@ -108,7 +118,7 @@ class ChrootManager:
raise ChrootError(
f"debootstrap failed: {result.stderr}",
chroot_name=chroot_name,
operation="debootstrap",
operation="/usr/sbin/debootstrap",
chroot_path=chroot_path,
)
@ -205,7 +215,7 @@ preserve-environment=true
def _initialize_chroot(self, chroot_path: str, arch: str, suite: str) -> None:
"""Initialize chroot using debootstrap"""
cmd = [
"debootstrap",
"/usr/sbin/debootstrap",
"--arch",
arch,
"--variant=buildd",
@ -487,3 +497,251 @@ preserve-environment=true
self.scrub_chroot(chroot_name)
except Exception as e:
print(f"Warning: Failed to scrub chroot '{chroot_name}': {e}")
def _setup_advanced_mounts(self, chroot_name: str) -> None:
"""Setup advanced mount points for the chroot"""
chroot_path = os.path.join(self.config.chroot_dir, chroot_name)
# Initialize mount tracking for this chroot
self._active_mounts[chroot_name] = []
try:
# Setup standard system mounts
if self.config.mount_proc:
self._mount_proc(chroot_name, chroot_path)
if self.config.mount_sys:
self._mount_sys(chroot_name, chroot_path)
if self.config.mount_dev:
self._mount_dev(chroot_name, chroot_path)
if self.config.mount_devpts:
self._mount_devpts(chroot_name, chroot_path)
if self.config.mount_tmp:
self._mount_tmp(chroot_name, chroot_path)
# Setup custom bind mounts
for bind_mount in self.config.bind_mounts:
self._setup_bind_mount(chroot_name, bind_mount)
# Setup tmpfs mounts
for tmpfs_mount in self.config.tmpfs_mounts:
self._setup_tmpfs_mount(chroot_name, tmpfs_mount)
# Setup overlay mounts
for overlay_mount in self.config.overlay_mounts:
self._setup_overlay_mount(chroot_name, overlay_mount)
except Exception as e:
raise ChrootError(
f"Failed to setup advanced mounts: {e}",
chroot_name=chroot_name,
operation="mount_setup"
)
def _mount_proc(self, chroot_name: str, chroot_path: str) -> None:
"""Mount /proc in the chroot"""
proc_path = os.path.join(chroot_path, "proc")
if not os.path.exists(proc_path):
os.makedirs(proc_path, exist_ok=True)
try:
subprocess.run(["mount", "--bind", "/proc", proc_path], check=True)
self._active_mounts[chroot_name].append(("proc", proc_path))
except subprocess.CalledProcessError as e:
print(f"Warning: Failed to mount /proc: {e}")
def _mount_sys(self, chroot_name: str, chroot_path: str) -> None:
"""Mount /sys in the chroot"""
sys_path = os.path.join(chroot_path, "sys")
if not os.path.exists(sys_path):
os.makedirs(sys_path, exist_ok=True)
try:
subprocess.run(["mount", "--bind", "/sys", sys_path], check=True)
self._active_mounts[chroot_name].append(("sys", sys_path))
except subprocess.CalledProcessError as e:
print(f"Warning: Failed to mount /sys: {e}")
def _mount_dev(self, chroot_name: str, chroot_path: str) -> None:
"""Mount /dev in the chroot"""
dev_path = os.path.join(chroot_path, "dev")
if not os.path.exists(dev_path):
os.makedirs(dev_path, exist_ok=True)
try:
subprocess.run(["mount", "--bind", "/dev", dev_path], check=True)
self._active_mounts[chroot_name].append(("dev", dev_path))
except subprocess.CalledProcessError as e:
print(f"Warning: Failed to mount /dev: {e}")
def _mount_devpts(self, chroot_name: str, chroot_path: str) -> None:
"""Mount /dev/pts in the chroot"""
devpts_path = os.path.join(chroot_path, "dev", "pts")
if not os.path.exists(devpts_path):
os.makedirs(devpts_path, exist_ok=True)
try:
subprocess.run(["mount", "-t", "devpts", "devpts", devpts_path], check=True)
self._active_mounts[chroot_name].append(("devpts", devpts_path))
except subprocess.CalledProcessError as e:
print(f"Warning: Failed to mount /dev/pts: {e}")
def _mount_tmp(self, chroot_name: str, chroot_path: str) -> None:
"""Mount /tmp in the chroot"""
tmp_path = os.path.join(chroot_path, "tmp")
if not os.path.exists(tmp_path):
os.makedirs(tmp_path, exist_ok=True)
try:
# Use tmpfs for better performance if configured
if self.config.use_tmpfs:
subprocess.run([
"mount", "-t", "tmpfs", "-o", f"size={self.config.tmpfs_size}",
"tmpfs", tmp_path
], check=True)
self._active_mounts[chroot_name].append(("tmpfs", tmp_path))
else:
# Bind mount host /tmp
subprocess.run(["mount", "--bind", "/tmp", tmp_path], check=True)
self._active_mounts[chroot_name].append(("tmp", tmp_path))
except subprocess.CalledProcessError as e:
print(f"Warning: Failed to mount /tmp: {e}")
def _setup_bind_mount(self, chroot_name: str, bind_mount: Dict[str, str]) -> None:
"""Setup a custom bind mount"""
host_path = bind_mount.get("host")
chroot_path = bind_mount.get("chroot")
options = bind_mount.get("options", "")
if not host_path or not chroot_path:
print(f"Warning: Invalid bind mount configuration: {bind_mount}")
return
# Create chroot mount point
full_chroot_path = os.path.join(self.config.chroot_dir, chroot_name, chroot_path.lstrip("/"))
os.makedirs(full_chroot_path, exist_ok=True)
try:
mount_cmd = ["mount", "--bind"]
if options:
mount_cmd.extend(["-o", options])
mount_cmd.extend([host_path, full_chroot_path])
subprocess.run(mount_cmd, check=True)
self._active_mounts[chroot_name].append(("bind", full_chroot_path))
except subprocess.CalledProcessError as e:
print(f"Warning: Failed to setup bind mount {host_path} -> {chroot_path}: {e}")
def _setup_tmpfs_mount(self, chroot_name: str, tmpfs_mount: Dict[str, str]) -> None:
"""Setup a tmpfs mount"""
chroot_path = tmpfs_mount.get("chroot")
size = tmpfs_mount.get("size", "100M")
options = tmpfs_mount.get("options", "")
if not chroot_path:
print(f"Warning: Invalid tmpfs mount configuration: {tmpfs_mount}")
return
# Create chroot mount point
full_chroot_path = os.path.join(self.config.chroot_dir, chroot_name, chroot_path.lstrip("/"))
os.makedirs(full_chroot_path, exist_ok=True)
try:
mount_cmd = ["mount", "-t", "tmpfs", "-o", f"size={size}"]
if options:
mount_cmd[-1] += f",{options}"
mount_cmd.extend(["tmpfs", full_chroot_path])
subprocess.run(mount_cmd, check=True)
self._active_mounts[chroot_name].append(("tmpfs", full_chroot_path))
except subprocess.CalledProcessError as e:
print(f"Warning: Failed to setup tmpfs mount {chroot_path}: {e}")
def _setup_overlay_mount(self, chroot_name: str, overlay_mount: Dict[str, str]) -> None:
"""Setup an overlay mount (requires overlayfs support)"""
lower_dir = overlay_mount.get("lower")
upper_dir = overlay_mount.get("upper")
work_dir = overlay_mount.get("work")
chroot_path = overlay_mount.get("chroot")
if not all([lower_dir, upper_dir, work_dir, chroot_path]):
print(f"Warning: Invalid overlay mount configuration: {overlay_mount}")
return
# Create chroot mount point
full_chroot_path = os.path.join(self.config.chroot_dir, chroot_name, chroot_path.lstrip("/"))
os.makedirs(full_chroot_path, exist_ok=True)
try:
# Create work directory if it doesn't exist
os.makedirs(work_dir, exist_ok=True)
mount_cmd = [
"mount", "-t", "overlay", "overlay",
"-o", f"lowerdir={lower_dir},upperdir={upper_dir},workdir={work_dir}",
full_chroot_path
]
subprocess.run(mount_cmd, check=True)
self._active_mounts[chroot_name].append(("overlay", full_chroot_path))
except subprocess.CalledProcessError as e:
print(f"Warning: Failed to setup overlay mount {chroot_path}: {e}")
def cleanup_mounts(self, chroot_name: str) -> None:
"""Clean up all mounts for a chroot"""
if chroot_name not in self._active_mounts:
return
for mount_type, mount_path in reversed(self._active_mounts[chroot_name]):
try:
subprocess.run(["umount", mount_path], check=True)
print(f"Unmounted {mount_type}: {mount_path}")
except subprocess.CalledProcessError as e:
print(f"Warning: Failed to unmount {mount_type} {mount_path}: {e}")
# Clear the mount list
self._active_mounts[chroot_name] = []
def list_mounts(self, chroot_name: str) -> List[Dict[str, str]]:
"""List all active mounts for a chroot"""
if chroot_name not in self._active_mounts:
return []
mounts = []
for mount_type, mount_path in self._active_mounts[chroot_name]:
mounts.append({
"type": mount_type,
"path": mount_path,
"chroot": chroot_name
})
return mounts
def _setup_chroot_users(self, chroot_name: str) -> None:
"""Setup users and permissions in the chroot"""
chroot_path = os.path.join(self.config.chroot_dir, chroot_name)
try:
# Create the build user
self.uid_manager.create_chroot_user(chroot_path)
# Copy host users if configured
if hasattr(self.config, 'copy_host_users'):
for username in self.config.copy_host_users:
self.uid_manager.copy_host_user(chroot_path, username)
# Setup chroot permissions
self.uid_manager.setup_chroot_permissions(chroot_path)
except Exception as e:
raise ChrootError(
f"Failed to setup chroot users: {e}",
chroot_name=chroot_name,
operation="user_setup"
)

View file

@ -4,6 +4,7 @@ Command-line interface for deb-mock
"""
import sys
import os
import click
@ -130,6 +131,108 @@ def build(
click.echo("Build completed successfully")
@main.command()
@click.argument("source_packages", nargs=-1, type=click.Path(exists=True))
@click.option("--chroot", help="Chroot environment to use")
@click.option("--max-workers", type=int, help="Maximum number of parallel workers")
@click.option("--arch", help="Target architecture")
@click.option("--output-dir", "-o", type=click.Path(), help="Output directory for build artifacts")
@click.option("--keep-chroot", is_flag=True, help="Keep chroots after build (for debugging)")
@click.option("--no-check", is_flag=True, help="Skip running tests during build")
@click.option("--offline", is_flag=True, help="Build in offline mode (no network access)")
@click.option("--build-timeout", type=int, help="Build timeout in seconds")
@click.option("--force-arch", help="Force target architecture")
@click.option("--cleanup-after", is_flag=True, help="Clean chroots after build")
@click.option("--no-cleanup-after", is_flag=True, help="Don't clean chroots after build")
@click.pass_context
@handle_exception
def build_parallel(
ctx,
source_packages,
chroot,
max_workers,
arch,
output_dir,
keep_chroot,
no_check,
offline,
build_timeout,
force_arch,
cleanup_after,
no_cleanup_after,
):
"""
Build multiple Debian source packages in parallel using separate chroots.
SOURCE_PACKAGES: One or more paths to .dsc files or source package directories
"""
if not source_packages:
click.echo("Error: No source packages specified", err=True)
sys.exit(1)
if len(source_packages) == 1:
click.echo("Warning: Only one package specified, consider using 'build' instead", err=True)
deb_mock = DebMock(ctx.obj["config"])
# Override config with command line options
if chroot:
ctx.obj["config"].chroot_name = chroot
if arch:
ctx.obj["config"].architecture = arch
if output_dir:
ctx.obj["config"].output_dir = output_dir
if keep_chroot:
ctx.obj["config"].keep_chroot = True
if build_timeout:
ctx.obj["config"].parallel_build_timeout = build_timeout
if force_arch:
ctx.obj["config"].architecture = force_arch
if cleanup_after:
ctx.obj["config"].parallel_build_cleanup = True
if no_cleanup_after:
ctx.obj["config"].parallel_build_cleanup = False
# Build options
build_kwargs = {}
if no_check:
build_kwargs["no_check"] = True
if offline:
build_kwargs["offline"] = True
click.echo(f"Building {len(source_packages)} packages in parallel...")
try:
results = deb_mock.build_parallel(
list(source_packages),
max_workers=max_workers,
**build_kwargs
)
# Display results summary
successful = sum(1 for r in results if r.get("success", False))
failed = len(results) - successful
click.echo(f"\n=== Parallel Build Results ===")
click.echo(f"Total packages: {len(results)}")
click.echo(f"Successful: {successful}")
click.echo(f"Failed: {failed}")
if failed > 0:
click.echo(f"\nFailed packages:")
for i, result in enumerate(results):
if not result.get("success", False):
click.echo(f" {i+1}. {result.get('package_name', 'unknown')}: {result.get('error', 'Unknown error')}")
sys.exit(1)
else:
click.echo(f"\n✅ All packages built successfully!")
except Exception as e:
click.echo(f"Error during parallel build: {e}", err=True)
sys.exit(1)
@main.command()
@click.argument("source_packages", nargs=-1, type=click.Path(exists=True))
@click.option("--chroot", help="Chroot environment to use")
@ -614,5 +717,721 @@ def debug_config(ctx, expand):
click.echo(f" {plugin_name}: {plugin_config}")
@main.command()
@click.argument("chroot_name")
@click.pass_context
@handle_exception
def list_mounts(ctx, chroot_name):
"""List all active mounts for a chroot"""
deb_mock = DebMock(ctx.obj["config"])
try:
mounts = deb_mock.chroot_manager.list_mounts(chroot_name)
if not mounts:
click.echo(f"No active mounts found for chroot '{chroot_name}'")
return
click.echo(f"Active mounts for chroot '{chroot_name}':")
for mount in mounts:
click.echo(f" {mount['type']}: {mount['path']}")
except Exception as e:
click.echo(f"Error listing mounts: {e}", err=True)
sys.exit(1)
@main.command()
@click.argument("chroot_name")
@click.pass_context
@handle_exception
def cleanup_mounts(ctx, chroot_name):
"""Clean up all mounts for a chroot"""
deb_mock = DebMock(ctx.obj["config"])
try:
click.echo(f"Cleaning up mounts for chroot '{chroot_name}'...")
deb_mock.chroot_manager.cleanup_mounts(chroot_name)
click.echo(f"✅ Mounts cleaned up for chroot '{chroot_name}'")
except Exception as e:
click.echo(f"Error cleaning up mounts: {e}", err=True)
sys.exit(1)
@main.command()
@click.argument("chroot_name")
@click.option("--mount-proc/--no-mount-proc", default=True, help="Mount /proc in chroot")
@click.option("--mount-sys/--no-mount-sys", default=True, help="Mount /sys in chroot")
@click.option("--mount-dev/--no-mount-dev", default=True, help="Mount /dev in chroot")
@click.option("--mount-devpts/--no-mount-devpts", default=True, help="Mount /dev/pts in chroot")
@click.option("--mount-tmp/--no-mount-tmp", default=True, help="Mount /tmp in chroot")
@click.option("--mount-home/--no-mount-home", default=False, help="Mount /home in chroot")
@click.option("--use-tmpfs", is_flag=True, help="Use tmpfs for /tmp mount")
@click.option("--tmpfs-size", default="2G", help="Size for tmpfs mount")
@click.pass_context
@handle_exception
def setup_mounts(
ctx,
chroot_name,
mount_proc,
mount_sys,
mount_dev,
mount_devpts,
mount_tmp,
mount_home,
use_tmpfs,
tmpfs_size,
):
"""Setup advanced mount points for a chroot"""
deb_mock = DebMock(ctx.obj["config"])
# Update config with mount options
ctx.obj["config"].mount_proc = mount_proc
ctx.obj["config"].mount_sys = mount_sys
ctx.obj["config"].mount_dev = mount_dev
ctx.obj["config"].mount_devpts = mount_devpts
ctx.obj["config"].mount_tmp = mount_tmp
ctx.obj["config"].mount_home = mount_home
ctx.obj["config"].use_tmpfs = use_tmpfs
ctx.obj["config"].tmpfs_size = tmpfs_size
try:
click.echo(f"Setting up advanced mounts for chroot '{chroot_name}'...")
# Clean up existing mounts first
deb_mock.chroot_manager.cleanup_mounts(chroot_name)
# Setup new mounts
deb_mock.chroot_manager._setup_advanced_mounts(chroot_name)
click.echo(f"✅ Advanced mounts setup complete for chroot '{chroot_name}'")
# Show current mounts
mounts = deb_mock.chroot_manager.list_mounts(chroot_name)
if mounts:
click.echo(f"\nActive mounts:")
for mount in mounts:
click.echo(f" {mount['type']}: {mount['path']}")
except Exception as e:
click.echo(f"Error setting up mounts: {e}", err=True)
sys.exit(1)
@main.command()
@click.argument("chroot_name")
@click.argument("host_path", type=click.Path(exists=True))
@click.argument("chroot_path")
@click.option("--options", help="Mount options (e.g., ro,noexec)")
@click.pass_context
@handle_exception
def bind_mount(ctx, chroot_name, host_path, chroot_path, options):
"""Add a custom bind mount to a chroot"""
deb_mock = DebMock(ctx.obj["config"])
try:
# Create bind mount configuration
bind_mount_config = {
"host": host_path,
"chroot": chroot_path,
"options": options or ""
}
click.echo(f"Adding bind mount {host_path} -> {chroot_path} to chroot '{chroot_name}'...")
# Setup the bind mount
deb_mock.chroot_manager._setup_bind_mount(chroot_name, bind_mount_config)
click.echo(f"✅ Bind mount added successfully")
except Exception as e:
click.echo(f"Error adding bind mount: {e}", err=True)
sys.exit(1)
@main.command()
@click.argument("chroot_name")
@click.argument("chroot_path")
@click.option("--size", default="100M", help="Size for tmpfs mount")
@click.option("--options", help="Additional mount options")
@click.pass_context
@handle_exception
def tmpfs_mount(ctx, chroot_name, chroot_path, size, options):
"""Add a tmpfs mount to a chroot"""
deb_mock = DebMock(ctx.obj["config"])
try:
# Create tmpfs mount configuration
tmpfs_mount_config = {
"chroot": chroot_path,
"size": size,
"options": options or ""
}
click.echo(f"Adding tmpfs mount {chroot_path} (size: {size}) to chroot '{chroot_name}'...")
# Setup the tmpfs mount
deb_mock.chroot_manager._setup_tmpfs_mount(chroot_name, tmpfs_mount_config)
click.echo(f"✅ Tmpfs mount added successfully")
except Exception as e:
click.echo(f"Error adding tmpfs mount: {e}", err=True)
sys.exit(1)
@main.command()
@click.argument("chroot_name")
@click.pass_context
@handle_exception
def user_info(ctx, chroot_name):
"""Show UID/GID information for a chroot"""
deb_mock = DebMock(ctx.obj["config"])
try:
user_info = deb_mock.chroot_manager.uid_manager.get_user_info()
click.echo(f"=== UID/GID Information for chroot '{chroot_name}' ===")
click.echo(f"Current user: {user_info['current_user']} (UID: {user_info['current_uid']}, GID: {user_info['current_gid']})")
click.echo(f"Chroot user: {user_info['chroot_user']} (UID: {user_info['chroot_uid']}, GID: {user_info['chroot_gid']})")
click.echo(f"Chroot group: {user_info['chroot_group']}")
# Check if chroot user exists
chroot_path = os.path.join(ctx.obj["config"].chroot_dir, chroot_name)
if os.path.exists(chroot_path):
user_valid = deb_mock.chroot_manager.uid_manager.validate_chroot_user(chroot_path)
click.echo(f"Chroot user configured: {'✅ Yes' if user_valid else '❌ No'}")
else:
click.echo("Chroot does not exist")
except Exception as e:
click.echo(f"Error getting user info: {e}", err=True)
sys.exit(1)
@main.command()
@click.argument("chroot_name")
@click.argument("username")
@click.pass_context
@handle_exception
def copy_host_user(ctx, chroot_name, username):
"""Copy a user from the host system to a chroot"""
deb_mock = DebMock(ctx.obj["config"])
try:
click.echo(f"Copying host user '{username}' to chroot '{chroot_name}'...")
chroot_path = os.path.join(ctx.obj["config"].chroot_dir, chroot_name)
if not os.path.exists(chroot_path):
click.echo(f"Error: Chroot '{chroot_name}' does not exist", err=True)
sys.exit(1)
deb_mock.chroot_manager.uid_manager.copy_host_user(chroot_path, username)
click.echo(f"✅ Successfully copied host user '{username}' to chroot")
except Exception as e:
click.echo(f"Error copying host user: {e}", err=True)
sys.exit(1)
@main.command()
@click.argument("chroot_name")
@click.pass_context
@handle_exception
def setup_users(ctx, chroot_name):
"""Setup users and permissions for a chroot"""
deb_mock = DebMock(ctx.obj["config"])
try:
click.echo(f"Setting up users and permissions for chroot '{chroot_name}'...")
chroot_path = os.path.join(ctx.obj["config"].chroot_dir, chroot_name)
if not os.path.exists(chroot_path):
click.echo(f"Error: Chroot '{chroot_name}' does not exist", err=True)
sys.exit(1)
# Setup chroot users
deb_mock.chroot_manager._setup_chroot_users(chroot_name)
click.echo(f"✅ Users and permissions setup complete for chroot '{chroot_name}'")
except Exception as e:
click.echo(f"Error setting up users: {e}", err=True)
sys.exit(1)
@main.command()
@click.pass_context
@handle_exception
def plugin_info(ctx):
"""Show information about loaded plugins"""
deb_mock = DebMock(ctx.obj["config"])
try:
plugin_info = deb_mock.plugin_manager.get_plugin_info()
click.echo("=== Plugin Information ===")
click.echo(f"Total plugins configured: {plugin_info['total_plugins']}")
click.echo(f"Loaded plugins: {', '.join(plugin_info['loaded_plugins']) if plugin_info['loaded_plugins'] else 'None'}")
click.echo(f"Available hook stages: {', '.join(plugin_info['available_stages']) if plugin_info['available_stages'] else 'None'}")
click.echo(f"Plugin directory: {plugin_info['plugin_dir']}")
click.echo(f"API version: {plugin_info['api_version']}")
except Exception as e:
click.echo(f"Error getting plugin info: {e}", err=True)
sys.exit(1)
@main.command()
@click.argument("stage")
@click.pass_context
@handle_exception
def list_hooks(ctx, stage):
"""List hooks registered for a specific stage"""
deb_mock = DebMock(ctx.obj["config"])
try:
hooks = deb_mock.plugin_manager.get_hooks(stage)
if not hooks:
click.echo(f"No hooks registered for stage '{stage}'")
return
click.echo(f"=== Hooks for stage '{stage}' ===")
for i, hook in enumerate(hooks, 1):
click.echo(f"{i}. {hook.__name__} ({hook.__module__})")
except Exception as e:
click.echo(f"Error listing hooks: {e}", err=True)
sys.exit(1)
@main.command()
@click.pass_context
@handle_exception
def list_stages(ctx):
"""List all available hook stages"""
deb_mock = DebMock(ctx.obj["config"])
try:
stages = deb_mock.plugin_manager.list_stages()
if not stages:
click.echo("No hook stages available")
return
click.echo("=== Available Hook Stages ===")
for stage in stages:
click.echo(f"- {stage}")
except Exception as e:
click.echo(f"Error listing stages: {e}", err=True)
sys.exit(1)
@main.command()
@click.argument("chroot_name")
@click.pass_context
@handle_exception
def chroot_info(ctx, chroot_name):
"""Show information about a chroot"""
deb_mock = DebMock(ctx.obj["config"])
try:
info = deb_mock.sbuild_wrapper.get_chroot_info(chroot_name)
click.echo(f"=== Chroot Information: {chroot_name} ===")
click.echo(f"Status: {info['status']}")
click.echo(f"Architecture: {info['architecture'] or 'Unknown'}")
click.echo(f"Distribution: {info['distribution'] or 'Unknown'}")
if 'package_count' in info:
click.echo(f"Package count: {info['package_count']}")
except Exception as e:
click.echo(f"Error getting chroot info: {e}", err=True)
sys.exit(1)
@main.command()
@click.argument("chroot_name")
@click.pass_context
@handle_exception
def update_chroot(ctx, chroot_name):
"""Update a chroot to ensure it's current"""
deb_mock = DebMock(ctx.obj["config"])
try:
click.echo(f"Updating chroot '{chroot_name}'...")
deb_mock.sbuild_wrapper.update_chroot(chroot_name)
click.echo(f"✅ Chroot '{chroot_name}' updated successfully")
except Exception as e:
click.echo(f"Error updating chroot: {e}", err=True)
sys.exit(1)
@main.command()
@click.argument("source_package", type=click.Path(exists=True))
@click.option("--chroot", help="Chroot environment to use")
@click.pass_context
@handle_exception
def check_deps(ctx, source_package, chroot):
"""Check build dependencies for a source package"""
deb_mock = DebMock(ctx.obj["config"])
try:
if chroot:
ctx.obj["config"].chroot_name = chroot
click.echo(f"Checking build dependencies for {source_package}...")
deps = deb_mock.sbuild_wrapper.check_dependencies(source_package)
if deps["satisfied"]:
click.echo("✅ All build dependencies are satisfied")
else:
click.echo("❌ Build dependencies are not satisfied")
if deps["missing"]:
click.echo(f"Missing dependencies: {', '.join(deps['missing'])}")
if deps["conflicts"]:
click.echo(f"Conflicting dependencies: {', '.join(deps['conflicts'])}")
except Exception as e:
click.echo(f"Error checking dependencies: {e}", err=True)
sys.exit(1)
@main.command()
@click.argument("dependencies", nargs=-1)
@click.option("--chroot", help="Chroot environment to use")
@click.pass_context
@handle_exception
def install_deps(ctx, dependencies, chroot):
"""Install build dependencies in a chroot"""
if not dependencies:
click.echo("Error: No dependencies specified", err=True)
sys.exit(1)
deb_mock = DebMock(ctx.obj["config"])
try:
if chroot:
ctx.obj["config"].chroot_name = chroot
click.echo(f"Installing build dependencies: {', '.join(dependencies)}")
deb_mock.sbuild_wrapper.install_build_dependencies(list(dependencies))
click.echo("✅ Build dependencies installed successfully")
except Exception as e:
click.echo(f"Error installing dependencies: {e}", err=True)
sys.exit(1)
@main.command()
@click.argument("source_package", type=click.Path(exists=True))
@click.option("--chroot", help="Chroot environment to use")
@click.option("--output-dir", "-o", type=click.Path(), help="Output directory for build artifacts")
@click.option("--verbose", is_flag=True, help="Verbose output")
@click.option("--debug", is_flag=True, help="Debug output")
@click.option("--keep-chroot", is_flag=True, help="Keep chroot after build")
@click.pass_context
@handle_exception
def build_with_sbuild(ctx, source_package, chroot, output_dir, verbose, debug, keep_chroot):
"""Build a Debian source package using sbuild"""
deb_mock = DebMock(ctx.obj["config"])
try:
# Override config with command line options
if chroot:
ctx.obj["config"].chroot_name = chroot
if output_dir:
ctx.obj["config"].output_dir = output_dir
if verbose:
ctx.obj["config"].verbose = True
if debug:
ctx.obj["config"].debug = True
if keep_chroot:
ctx.obj["config"].keep_chroot = True
click.echo(f"Building {source_package} with sbuild...")
# Check dependencies first
deps = deb_mock.sbuild_wrapper.check_dependencies(source_package)
if not deps["satisfied"]:
click.echo("⚠️ Build dependencies not satisfied. Attempting to install...")
if deps["missing"]:
deb_mock.sbuild_wrapper.install_build_dependencies(deps["missing"])
# Build the package
result = deb_mock.sbuild_wrapper.build_package(source_package)
if result["success"]:
click.echo("✅ Package built successfully!")
click.echo(f"Output directory: {result['output_dir']}")
if result["artifacts"]:
click.echo("Build artifacts:")
for artifact in result["artifacts"]:
click.echo(f" {artifact}")
else:
click.echo("❌ Package build failed")
sys.exit(1)
except Exception as e:
click.echo(f"Error building package: {e}", err=True)
sys.exit(1)
@main.command()
@click.pass_context
@handle_exception
def performance_summary(ctx):
"""Show performance summary and statistics"""
deb_mock = DebMock(ctx.obj["config"])
try:
summary = deb_mock.performance_monitor.get_performance_summary()
if not summary:
click.echo("No performance data available yet")
return
click.echo("=== Performance Summary ===")
click.echo(f"Total Operations: {summary.get('total_operations', 0)}")
click.echo(f"Total Duration: {summary.get('total_duration', 0):.2f}s")
click.echo(f"Average Duration: {summary.get('average_duration', 0):.2f}s")
click.echo(f"Active Operations: {summary.get('active_operations', 0)}")
# Operation statistics
if 'operation_stats' in summary:
click.echo("\n=== Operation Statistics ===")
for op_name, stats in summary['operation_stats'].items():
click.echo(f"{op_name}:")
click.echo(f" Count: {stats['count']}")
click.echo(f" Avg Duration: {stats['avg_duration']:.2f}s")
click.echo(f" Min Duration: {stats['min_duration']:.2f}s")
click.echo(f" Max Duration: {stats['max_duration']:.2f}s")
# System statistics
if 'system_stats' in summary:
click.echo("\n=== System Statistics ===")
for key, value in summary['system_stats'].items():
click.echo(f"{key}: {value:.2f}")
except Exception as e:
click.echo(f"Error getting performance summary: {e}", err=True)
sys.exit(1)
@main.command()
@click.argument("operation_name")
@click.option("--iterations", "-i", type=int, default=3, help="Number of benchmark iterations")
@click.option("--function", "-f", help="Function to benchmark (e.g., 'build', 'chroot_creation')")
@click.pass_context
@handle_exception
def benchmark(ctx, operation_name, iterations, function):
"""Benchmark an operation multiple times"""
deb_mock = DebMock(ctx.obj["config"])
try:
if not function:
click.echo("Error: Please specify a function to benchmark with --function", err=True)
sys.exit(1)
# Get the function to benchmark
if hasattr(deb_mock, function):
operation_func = getattr(deb_mock, function)
else:
click.echo(f"Error: Function '{function}' not found", err=True)
sys.exit(1)
click.echo(f"Benchmarking {operation_name} with {iterations} iterations...")
result = deb_mock.performance_monitor.benchmark_operation(
operation_name, operation_func, iterations
)
click.echo(f"\n=== Benchmark Results for {operation_name} ===")
click.echo(f"Iterations: {result['iterations']}")
click.echo(f"Average Duration: {result['average_duration']:.2f}s")
click.echo(f"Min Duration: {result['min_duration']:.2f}s")
click.echo(f"Max Duration: {result['max_duration']:.2f}s")
click.echo(f"Variance: {result['variance']:.4f}")
except Exception as e:
click.echo(f"Error during benchmarking: {e}", err=True)
sys.exit(1)
@main.command()
@click.option("--output-file", "-o", type=click.Path(), help="Output file for the report")
@click.pass_context
@handle_exception
def performance_report(ctx, output_file):
"""Generate a comprehensive performance report"""
deb_mock = DebMock(ctx.obj["config"])
try:
click.echo("Generating performance report...")
report_file = deb_mock.performance_reporter.generate_performance_report(
deb_mock.performance_monitor, output_file
)
click.echo(f"✅ Performance report generated: {report_file}")
except Exception as e:
click.echo(f"Error generating performance report: {e}", err=True)
sys.exit(1)
@main.command()
@click.argument("build_id")
@click.option("--output-file", "-o", type=click.Path(), help="Output file for the report")
@click.pass_context
@handle_exception
def build_profile_report(ctx, build_id, output_file):
"""Generate a detailed build profile report"""
deb_mock = DebMock(ctx.obj["config"])
try:
# Find the build profile
profile = None
for profile_id, prof in deb_mock.performance_monitor._build_profiles.items():
if prof.build_id == build_id:
profile = prof
break
if not profile:
click.echo(f"Error: Build profile with ID '{build_id}' not found", err=True)
sys.exit(1)
click.echo(f"Generating build profile report for {profile.package_name}...")
report_file = deb_mock.performance_reporter.generate_build_profile_report(
profile, output_file
)
click.echo(f"✅ Build profile report generated: {report_file}")
except Exception as e:
click.echo(f"Error generating build profile report: {e}", err=True)
sys.exit(1)
@main.command()
@click.pass_context
@handle_exception
def performance_analysis(ctx):
"""Analyze performance and generate optimization suggestions"""
deb_mock = DebMock(ctx.obj["config"])
try:
click.echo("Analyzing performance data...")
# Get all build profiles
profiles = list(deb_mock.performance_monitor._build_profiles.values())
if not profiles:
click.echo("No build profiles available for analysis")
return
click.echo(f"Found {len(profiles)} build profiles for analysis")
# Analyze each profile
for i, profile in enumerate(profiles, 1):
click.echo(f"\n=== Analysis {i}: {profile.package_name} ===")
analysis = deb_mock.performance_optimizer.analyze_build_performance(profile)
click.echo(f"Performance Score: {analysis['score']}/100")
if analysis['suggestions']:
click.echo("\nOptimization Suggestions:")
for suggestion in analysis['suggestions']:
click.echo(f"{suggestion}")
if analysis['automatic_tunings']:
click.echo("\nAutomatic Tuning Recommendations:")
for tuning in analysis['automatic_tunings']:
click.echo(f"{tuning['reason']}")
click.echo(f" Current: {tuning.get('current', 'N/A')}")
click.echo(f" Suggested: {tuning.get('suggested', 'N/A')}")
if analysis['manual_recommendations']:
click.echo("\nManual Optimization Recommendations:")
for recommendation in analysis['manual_recommendations']:
click.echo(f"{recommendation}")
click.echo("\n✅ Performance analysis completed")
except Exception as e:
click.echo(f"Error during performance analysis: {e}", err=True)
sys.exit(1)
@main.command()
@click.option("--auto-apply", is_flag=True, help="Automatically apply optimization tunings")
@click.pass_context
@handle_exception
def optimize(ctx, auto_apply):
"""Apply performance optimizations based on analysis"""
deb_mock = DebMock(ctx.obj["config"])
try:
click.echo("Applying performance optimizations...")
# Get all build profiles
profiles = list(deb_mock.performance_monitor._build_profiles.values())
if not profiles:
click.echo("No build profiles available for optimization")
return
total_tunings = 0
applied_tunings = 0
for profile in profiles:
analysis = deb_mock.performance_optimizer.analyze_build_performance(profile)
total_tunings += len(analysis['automatic_tunings'])
if auto_apply and analysis['automatic_tunings']:
results = deb_mock.performance_optimizer.apply_automatic_tunings(
analysis['automatic_tunings']
)
applied_tunings += len(results['applied'])
if results['failed']:
click.echo(f"⚠️ Some tunings failed for {profile.package_name}")
click.echo(f"\n=== Optimization Summary ===")
click.echo(f"Total tunings available: {total_tunings}")
click.echo(f"Tunings applied: {applied_tunings}")
if auto_apply:
click.echo("✅ Automatic optimization completed")
else:
click.echo(" Use --auto-apply to automatically apply optimizations")
except Exception as e:
click.echo(f"Error during optimization: {e}", err=True)
sys.exit(1)
@main.command()
@click.option("--output-file", "-o", type=click.Path(), help="Output file for metrics export")
@click.pass_context
@handle_exception
def export_metrics(ctx, output_file):
"""Export performance metrics to a file"""
deb_mock = DebMock(ctx.obj["config"])
try:
click.echo("Exporting performance metrics...")
export_file = deb_mock.performance_monitor.export_metrics(output_file)
click.echo(f"✅ Performance metrics exported to: {export_file}")
except Exception as e:
click.echo(f"Error exporting metrics: {e}", err=True)
sys.exit(1)
@main.command()
@click.pass_context
@handle_exception
def cleanup_metrics(ctx):
"""Clean up old performance metrics"""
deb_mock = DebMock(ctx.obj["config"])
try:
click.echo("Cleaning up old performance metrics...")
deb_mock.performance_monitor.cleanup_old_metrics()
click.echo("✅ Old performance metrics cleaned up")
except Exception as e:
click.echo(f"Error cleaning up metrics: {e}", err=True)
sys.exit(1)
if __name__ == "__main__":
main()

View file

@ -59,7 +59,57 @@ class Config:
# Parallel builds
self.parallel_jobs = kwargs.get("parallel_jobs", 4)
self.parallel_compression = kwargs.get("parallel_compression", True)
# Advanced parallel build support
self.parallel_builds = kwargs.get("parallel_builds", 2) # Number of parallel chroots
self.parallel_chroot_prefix = kwargs.get("parallel_chroot_prefix", "parallel")
self.parallel_build_timeout = kwargs.get("parallel_build_timeout", 3600) # seconds
self.parallel_build_cleanup = kwargs.get("parallel_build_cleanup", True)
# Advanced mount management
self.advanced_mounts = kwargs.get("advanced_mounts", {})
self.bind_mounts = kwargs.get("bind_mounts", [])
self.tmpfs_mounts = kwargs.get("tmpfs_mounts", [])
self.overlay_mounts = kwargs.get("overlay_mounts", [])
self.mount_options = kwargs.get("mount_options", {})
# Mount isolation and security
self.mount_proc = kwargs.get("mount_proc", True)
self.mount_sys = kwargs.get("mount_sys", True)
self.mount_dev = kwargs.get("mount_dev", True)
self.mount_devpts = kwargs.get("mount_devpts", True)
self.mount_tmp = kwargs.get("mount_tmp", True)
self.mount_home = kwargs.get("mount_home", False)
# Advanced chroot features
self.use_namespaces = kwargs.get("use_namespaces", False)
self.uid_mapping = kwargs.get("uid_mapping", None)
self.gid_mapping = kwargs.get("gid_mapping", None)
self.capabilities = kwargs.get("capabilities", [])
self.seccomp_profile = kwargs.get("seccomp_profile", None)
# UID/GID management
self.chroot_user = kwargs.get("chroot_user", "build")
self.chroot_group = kwargs.get("chroot_group", "build")
self.chroot_uid = kwargs.get("chroot_uid", 1000)
self.chroot_gid = kwargs.get("chroot_gid", 1000)
self.use_host_user = kwargs.get("use_host_user", False)
self.copy_host_users = kwargs.get("copy_host_users", [])
self.preserve_uid_gid = kwargs.get("preserve_uid_gid", True)
# Plugin system
self.plugins = kwargs.get("plugins", [])
self.plugin_conf = kwargs.get("plugin_conf", {})
self.plugin_dir = kwargs.get("plugin_dir", "/usr/share/deb-mock/plugins")
# Performance monitoring and optimization
self.enable_performance_monitoring = kwargs.get("enable_performance_monitoring", True)
self.performance_metrics_dir = kwargs.get("performance_metrics_dir", "./performance-metrics")
self.performance_retention_days = kwargs.get("performance_retention_days", 30)
self.performance_auto_optimization = kwargs.get("performance_auto_optimization", False)
self.performance_benchmark_iterations = kwargs.get("performance_benchmark_iterations", 3)
self.performance_reporting = kwargs.get("performance_reporting", True)
# Network and proxy
self.use_host_resolv = kwargs.get("use_host_resolv", True)
self.http_proxy = kwargs.get("http_proxy", None)
@ -124,10 +174,6 @@ class Config:
self.apt_command = kwargs.get("apt_command", "apt-get")
self.apt_install_command = kwargs.get("apt_install_command", "apt-get install -y")
# Plugin configuration
self.plugins = kwargs.get("plugins", {})
self.plugin_dir = kwargs.get("plugin_dir", "/usr/lib/deb-mock/plugins")
@classmethod
def from_file(cls, config_path: str) -> "Config":
"""Load configuration from a YAML file"""
@ -179,6 +225,42 @@ class Config:
"tmpfs_size": self.tmpfs_size,
"parallel_jobs": self.parallel_jobs,
"parallel_compression": self.parallel_compression,
"parallel_builds": self.parallel_builds,
"parallel_chroot_prefix": self.parallel_chroot_prefix,
"parallel_build_timeout": self.parallel_build_timeout,
"parallel_build_cleanup": self.parallel_build_cleanup,
"advanced_mounts": self.advanced_mounts,
"bind_mounts": self.bind_mounts,
"tmpfs_mounts": self.tmpfs_mounts,
"overlay_mounts": self.overlay_mounts,
"mount_options": self.mount_options,
"mount_proc": self.mount_proc,
"mount_sys": self.mount_sys,
"mount_dev": self.mount_dev,
"mount_devpts": self.mount_devpts,
"mount_tmp": self.mount_tmp,
"mount_home": self.mount_home,
"use_namespaces": self.use_namespaces,
"uid_mapping": self.uid_mapping,
"gid_mapping": self.gid_mapping,
"capabilities": self.capabilities,
"seccomp_profile": self.seccomp_profile,
"chroot_user": self.chroot_user,
"chroot_group": self.chroot_group,
"chroot_uid": self.chroot_uid,
"chroot_gid": self.chroot_gid,
"use_host_user": self.use_host_user,
"copy_host_users": self.copy_host_users,
"preserve_uid_gid": self.preserve_uid_gid,
"plugins": self.plugins,
"plugin_conf": self.plugin_conf,
"plugin_dir": self.plugin_dir,
"enable_performance_monitoring": self.enable_performance_monitoring,
"performance_metrics_dir": self.performance_metrics_dir,
"performance_retention_days": self.performance_retention_days,
"performance_auto_optimization": self.performance_auto_optimization,
"performance_benchmark_iterations": self.performance_benchmark_iterations,
"performance_reporting": self.performance_reporting,
"use_host_resolv": self.use_host_resolv,
"http_proxy": self.http_proxy,
"https_proxy": self.https_proxy,
@ -229,6 +311,7 @@ class Config:
# Check suite
valid_suites = [
"trixie", # Debian 13+ (trixie) - required for OSTree support
"bookworm",
"sid",
"bullseye",

View file

@ -0,0 +1,36 @@
# Debian Trixie (Debian 13) - AMD64
# Equivalent to Mock's fedora-39-x86_64 config
# Debian 13+ (trixie) has the required OSTree version for bootc support
description: "Debian Trixie (Debian 13) - AMD64"
chroot_name: "debian-trixie-amd64"
architecture: "amd64"
suite: "trixie"
mirror: "http://deb.debian.org/debian/"
# Build environment
build_env:
DEB_BUILD_OPTIONS: "parallel=4,nocheck"
DEB_BUILD_PROFILES: "nocheck"
DEB_CFLAGS_SET: "-O2"
DEB_CXXFLAGS_SET: "-O2"
DEB_LDFLAGS_SET: "-Wl,-z,defs"
# Build options
build_options:
- "--verbose"
- "--no-run-lintian"
# Chroot configuration
chroot_dir: "/var/lib/deb-mock/chroots"
chroot_config_dir: "/etc/schroot/chroot.d"
# sbuild configuration
sbuild_config: "/etc/sbuild/sbuild.conf"
sbuild_log_dir: "/var/log/sbuild"
# Output configuration
output_dir: "./output"
metadata_dir: "./metadata"
keep_chroot: false
verbose: false
debug: false

View file

@ -3,8 +3,11 @@ Core DebMock class for orchestrating the build process
"""
import os
import threading
import concurrent.futures
from pathlib import Path
from typing import Any, Dict, List, Optional
import time
from .cache import CacheManager
from .chroot import ChrootManager
@ -12,6 +15,8 @@ from .config import Config
from .exceptions import ChrootError
from .metadata import MetadataManager
from .sbuild import SbuildWrapper
from .plugin import PluginManager, HookStages
from .performance import PerformanceMonitor, PerformanceOptimizer, PerformanceReporter
class DebMock:
@ -23,12 +28,25 @@ class DebMock:
self.sbuild_wrapper = SbuildWrapper(config)
self.metadata_manager = MetadataManager(config)
self.cache_manager = CacheManager(config)
self.plugin_manager = PluginManager(config)
# Validate configuration
self.config.validate()
# Setup caches
self._setup_caches()
# Initialize plugins
self.plugin_manager.init_plugins(self)
# Initialize performance monitoring
self.performance_monitor = PerformanceMonitor(config)
self.performance_optimizer = PerformanceOptimizer(config)
self.performance_reporter = PerformanceReporter(config)
# Parallel build support
self._build_lock = threading.Lock()
self._active_builds = {}
def _setup_caches(self) -> None:
"""Setup cache directories and ccache"""
@ -43,42 +61,199 @@ class DebMock:
def build(self, source_package: str, **kwargs) -> Dict[str, Any]:
"""Build a Debian source package in an isolated environment"""
# Create build profile for performance tracking
build_id = f"build_{int(time.time() * 1000)}"
profile_id = self.performance_monitor.create_build_profile(
build_id, source_package, self.config.architecture, self.config.suite
)
# Call pre-build hooks
self.plugin_manager.call_hooks(HookStages.PREBUILD, source_package, **kwargs)
# Ensure chroot exists
chroot_name = kwargs.get("chroot_name", self.config.chroot_name)
chroot_path = self.config.get_chroot_path()
if not self.chroot_manager.chroot_exists(chroot_name):
with self.performance_monitor.monitor_operation("chroot_creation") as op_id:
self.chroot_manager.create_chroot(chroot_name)
# Add chroot creation metrics to profile
self.performance_monitor.add_phase_metrics(profile_id, "chroot_creation",
self.performance_monitor._active_operations[op_id])
# Try to restore from cache first
if not self.chroot_manager.chroot_exists(chroot_name):
if not self.cache_manager.restore_root_cache(chroot_path):
self.chroot_manager.create_chroot(chroot_name)
# Check build dependencies
deps_check = self.sbuild_wrapper.check_dependencies(source_package, chroot_name)
if not deps_check["satisfied"]:
# Try to install missing dependencies
if deps_check["missing"]:
self.sbuild_wrapper.install_build_dependencies(deps_check["missing"], chroot_name)
# Setup build environment
build_env = self.config.setup_build_environment()
with self.performance_monitor.monitor_operation("build_env_setup") as op_id:
build_env = self.config.setup_build_environment()
# Add build environment setup metrics to profile
self.performance_monitor.add_phase_metrics(profile_id, "build_env_setup",
self.performance_monitor._active_operations[op_id])
# Call build start hook
self.plugin_manager.call_hooks(HookStages.BUILD_START, source_package, chroot_name, **kwargs)
# Build the package
build_result = self.sbuild_wrapper.build_package(source_package, chroot_name, build_env=build_env, **kwargs)
with self.performance_monitor.monitor_operation("package_build") as op_id:
build_result = self.sbuild_wrapper.build_package(source_package, chroot_name, build_env=build_env, **kwargs)
# Add package build metrics to profile
self.performance_monitor.add_phase_metrics(profile_id, "package_build",
self.performance_monitor._active_operations[op_id])
# Call build end hook
self.plugin_manager.call_hooks(HookStages.BUILD_END, build_result, source_package, chroot_name, **kwargs)
# Create cache after successful build
if build_result.get("success", False):
self.cache_manager.create_root_cache(chroot_path)
with self.performance_monitor.monitor_operation("cache_creation") as op_id:
self.cache_manager.create_root_cache(chroot_path)
# Add cache creation metrics to profile
self.performance_monitor.add_phase_metrics(profile_id, "cache_creation",
self.performance_monitor._active_operations[op_id])
# Capture and store metadata
metadata = self._capture_build_metadata(build_result, source_package)
self.metadata_manager.store_metadata(metadata)
with self.performance_monitor.monitor_operation("metadata_capture") as op_id:
metadata = self._capture_build_metadata(build_result, source_package)
self.metadata_manager.store_metadata(metadata)
# Add metadata capture metrics to profile
self.performance_monitor.add_phase_metrics(profile_id, "metadata_capture",
self.performance_monitor._active_operations[op_id])
# Clean up chroot if not keeping it
if not kwargs.get("keep_chroot", self.config.keep_chroot):
self.chroot_manager.clean_chroot(chroot_name)
with self.performance_monitor.monitor_operation("chroot_cleanup") as op_id:
self.chroot_manager.clean_chroot(chroot_name)
# Add chroot cleanup metrics to profile
self.performance_monitor.add_phase_metrics(profile_id, "chroot_cleanup",
self.performance_monitor._active_operations[op_id])
# Call post-build hooks
self.plugin_manager.call_hooks(HookStages.POSTBUILD, build_result, source_package, **kwargs)
# Finalize build profile and generate optimization suggestions
build_profile = self.performance_monitor.finalize_build_profile(profile_id)
if build_profile and self.config.performance_auto_optimization:
analysis = self.performance_optimizer.analyze_build_performance(build_profile)
if analysis['automatic_tunings']:
self.performance_optimizer.apply_automatic_tunings(analysis['automatic_tunings'])
return build_result
def build_parallel(self, source_packages: List[str], max_workers: int = None, **kwargs) -> List[Dict[str, Any]]:
"""
Build multiple packages in parallel using multiple chroots
Args:
source_packages: List of source packages to build
max_workers: Maximum number of parallel builds (default: config.parallel_builds)
**kwargs: Additional build options
Returns:
List of build results in the same order as source_packages
"""
if max_workers is None:
max_workers = getattr(self.config, 'parallel_builds', 2)
# Limit max_workers to available system resources
max_workers = min(max_workers, os.cpu_count() or 2)
print(f"Building {len(source_packages)} packages with {max_workers} parallel workers")
# Create unique chroot names for parallel builds
chroot_names = [f"{self.config.chroot_name}-parallel-{i}" for i in range(len(source_packages))]
# Prepare build tasks
build_tasks = []
for i, (source_package, chroot_name) in enumerate(zip(source_packages, chroot_names)):
task_kwargs = kwargs.copy()
task_kwargs['chroot_name'] = chroot_name
task_kwargs['package_index'] = i
build_tasks.append((source_package, task_kwargs))
# Execute builds in parallel
results = [None] * len(source_packages)
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all build tasks
future_to_index = {
executor.submit(self._build_single_parallel, source_pkg, **task_kwargs): i
for i, (source_pkg, task_kwargs) in enumerate(build_tasks)
}
# Collect results as they complete
for future in concurrent.futures.as_completed(future_to_index):
index = future_to_index[future]
try:
result = future.result()
results[index] = result
print(f"✅ Package {index + 1}/{len(source_packages)} completed: {result.get('package_name', 'unknown')}")
except Exception as e:
results[index] = {
'success': False,
'error': str(e),
'package_name': source_packages[index] if index < len(source_packages) else 'unknown'
}
print(f"❌ Package {index + 1}/{len(source_packages)} failed: {e}")
# Clean up parallel chroots
for chroot_name in chroot_names:
try:
self.chroot_manager.clean_chroot(chroot_name)
except Exception as e:
print(f"Warning: Failed to clean chroot {chroot_name}: {e}")
return results
def _build_single_parallel(self, source_package: str, **kwargs) -> Dict[str, Any]:
"""Build a single package for parallel execution"""
chroot_name = kwargs.get("chroot_name", self.config.chroot_name)
package_index = kwargs.get("package_index", 0)
print(f"🔄 Starting parallel build {package_index + 1}: {source_package}")
try:
# Ensure chroot exists for this parallel build
chroot_path = os.path.join(self.config.chroot_dir, chroot_name)
if not self.chroot_manager.chroot_exists(chroot_name):
if not self.cache_manager.restore_root_cache(chroot_path):
self.chroot_manager.create_chroot(chroot_name)
# Check build dependencies
deps_check = self.sbuild_wrapper.check_dependencies(source_package, chroot_name)
if not deps_check["satisfied"]:
if deps_check["missing"]:
self.sbuild_wrapper.install_build_dependencies(deps_check["missing"], chroot_name)
# Setup build environment
build_env = self.config.setup_build_environment()
# Build the package
build_result = self.sbuild_wrapper.build_package(
source_package, chroot_name, build_env=build_env, **kwargs
)
# Create cache after successful build
if build_result.get("success", False):
self.cache_manager.create_root_cache(chroot_path)
# Capture and store metadata
metadata = self._capture_build_metadata(build_result, source_package)
self.metadata_manager.store_metadata(metadata)
return build_result
except Exception as e:
return {
'success': False,
'error': str(e),
'package_name': source_package,
'chroot_name': chroot_name
}
def build_chain(self, source_packages: List[str], **kwargs) -> List[Dict[str, Any]]:
"""Build a chain of packages that depend on each other (similar to Mock's --chain)"""
@ -98,66 +273,52 @@ class DebMock:
try:
# Build the package
result = self.sbuild_wrapper.build_package(source_package, chroot_name, build_env=build_env, **kwargs)
results.append(
{
"package": source_package,
"success": True,
"result": result,
"order": i + 1,
}
)
# Install the built package in the chroot for subsequent builds
if result.get("artifacts"):
self._install_built_package(result["artifacts"], chroot_name)
except Exception as e:
results.append(
{
"package": source_package,
"success": False,
"error": str(e),
"order": i + 1,
}
)
# Stop chain on failure unless continue_on_failure is specified
if not kwargs.get("continue_on_failure", False):
# Store result
results.append(result)
# If build failed, stop the chain
if not result.get("success", False):
print(f"Chain build failed at package {i+1}: {source_package}")
break
# Create cache after successful chain build
if any(r["success"] for r in results):
self.cache_manager.create_root_cache(chroot_path)
# Install the built package for dependency resolution
if result.get("success", False) and kwargs.get("install_built", True):
self._install_built_package(result, chroot_name)
except Exception as e:
error_result = {
"success": False,
"error": str(e),
"package": source_package,
"chain_position": i
}
results.append(error_result)
break
return results
def _install_built_package(self, artifacts: List[str], chroot_name: str) -> None:
"""Install a built package in the chroot for chain building"""
# Find .deb files in artifacts
deb_files = [art for art in artifacts if art.endswith(".deb")]
if not deb_files:
return
# Copy .deb files to chroot and install them
for deb_file in deb_files:
try:
# Copy to chroot
chroot_deb_path = f"/tmp/{os.path.basename(deb_file)}"
self.chroot_manager.copy_to_chroot(deb_file, chroot_deb_path, chroot_name)
# Install in chroot
self.chroot_manager.execute_in_chroot(
chroot_name, ["dpkg", "-i", chroot_deb_path], capture_output=False
)
# Clean up
self.chroot_manager.execute_in_chroot(chroot_name, ["rm", "-f", chroot_deb_path], capture_output=False)
except Exception as e:
# Log warning but continue
print(f"Warning: Failed to install {deb_file} in chroot: {e}")
def _install_built_package(self, build_result: Dict[str, Any], chroot_name: str) -> None:
"""Install a built package in the chroot for dependency resolution"""
try:
# Extract .deb files from build result
deb_files = build_result.get("artifacts", {}).get("deb_files", [])
for deb_file in deb_files:
if deb_file.endswith(".deb"):
# Copy .deb to chroot and install
self.chroot_manager.copy_in(deb_file, chroot_name, "/tmp/")
# Install the package
install_cmd = ["dpkg", "-i", f"/tmp/{os.path.basename(deb_file)}"]
self.chroot_manager.execute_in_chroot(chroot_name, install_cmd)
# Fix any broken dependencies
fix_cmd = ["apt-get", "install", "-f", "-y"]
self.chroot_manager.execute_in_chroot(chroot_name, fix_cmd)
except Exception as e:
print(f"Warning: Failed to install built package: {e}")
def init_chroot(self, chroot_name: str, arch: str = None, suite: str = None) -> None:
"""Initialize a new chroot environment"""
@ -391,14 +552,14 @@ class DebMock:
result = self.chroot_manager.execute_in_chroot(
chroot_name,
f"{self.config.apt_install_command} {' '.join(packages)}",
as_root=True,
)
return {
"success": result["returncode"] == 0,
"success": result.returncode == 0,
"installed": packages,
"output": result["stdout"],
"error": result["stderr"] if result["returncode"] != 0 else None,
"output": result.stdout,
"error": result.stderr if result.returncode != 0 else None,
}
def update_packages(self, packages: List[str] = None) -> Dict[str, Any]:
@ -416,13 +577,13 @@ class DebMock:
# Update all packages
cmd = f"{self.config.apt_command} update && {self.config.apt_command} upgrade -y"
result = self.chroot_manager.execute_in_chroot(chroot_name, cmd, as_root=True)
result = self.chroot_manager.execute_in_chroot(chroot_name, cmd)
return {
"success": result["returncode"] == 0,
"success": result.returncode == 0,
"updated": packages if packages else "all",
"output": result["stdout"],
"error": result["stderr"] if result["returncode"] != 0 else None,
"output": result.stdout,
"error": result.stderr if result.returncode != 0 else None,
}
def remove_packages(self, packages: List[str]) -> Dict[str, Any]:
@ -435,13 +596,13 @@ class DebMock:
# Remove packages using APT
cmd = f"{self.config.apt_command} remove -y {' '.join(packages)}"
result = self.chroot_manager.execute_in_chroot(chroot_name, cmd, as_root=True)
result = self.chroot_manager.execute_in_chroot(chroot_name, cmd)
return {
"success": result["returncode"] == 0,
"success": result.returncode == 0,
"removed": packages,
"output": result["stdout"],
"error": result["stderr"] if result["returncode"] != 0 else None,
"output": result.stdout,
"error": result.stderr if result.returncode != 0 else None,
}
def execute_apt_command(self, command: str) -> Dict[str, Any]:
@ -454,11 +615,11 @@ class DebMock:
# Execute APT command
cmd = f"{self.config.apt_command} {command}"
result = self.chroot_manager.execute_in_chroot(chroot_name, cmd, as_root=True)
result = self.chroot_manager.execute_in_chroot(chroot_name, cmd)
return {
"success": result["returncode"] == 0,
"success": result.returncode == 0,
"command": command,
"output": result["stdout"],
"error": result["stderr"] if result["returncode"] != 0 else None,
"output": result.stdout,
"error": result.stderr if result.returncode != 0 else None,
}

View file

@ -412,6 +412,23 @@ class ValidationError(DebMockError):
super().__init__(message, exit_code=12, context=context, suggestions=suggestions)
class UIDManagerError(DebMockError):
"""Raised when UID/GID management operations fail"""
def __init__(self, message, chroot_name=None, operation=None):
super().__init__(message)
self.chroot_name = chroot_name
self.operation = operation
def get_exit_code(self):
return 20 # UID management error
class PerformanceError(Exception):
"""Raised when performance monitoring or optimization fails"""
pass
# Convenience functions for common error patterns
def handle_exception(func):
"""

1541
deb_mock/performance.py Normal file

File diff suppressed because it is too large Load diff

248
deb_mock/plugin.py Normal file
View file

@ -0,0 +1,248 @@
"""
Plugin system for deb-mock
Based on Fedora Mock's plugin architecture
"""
import importlib.machinery
import importlib.util
import sys
import os
import logging
from typing import Dict, List, Any, Callable, Optional
from pathlib import Path
from .exceptions import PluginError
class PluginManager:
"""Manages plugins for deb-mock"""
# Current API version
CURRENT_API_VERSION = "1.0"
def __init__(self, config):
self.config = config
self.logger = logging.getLogger(__name__)
# Plugin configuration
self.plugins = getattr(config, 'plugins', [])
self.plugin_conf = getattr(config, 'plugin_conf', {})
self.plugin_dir = getattr(config, 'plugin_dir', '/usr/share/deb-mock/plugins')
# Hook system
self._hooks = {}
self._initialized_plugins = []
# Plugin state tracking
self.already_initialized = False
def __repr__(self):
return f"<deb_mock.plugin.PluginManager: plugins={len(self.plugins)}, hooks={len(self._hooks)}>"
def init_plugins(self, deb_mock):
"""Initialize all enabled plugins"""
if self.already_initialized:
return
self.already_initialized = True
self.logger.info("Initializing plugins...")
# Update plugin configuration with deb-mock context
for key in list(self.plugin_conf.keys()):
if key.endswith('_opts'):
self.plugin_conf[key].update({
'basedir': getattr(deb_mock.config, 'basedir', '/var/lib/deb-mock'),
'chroot_dir': deb_mock.config.chroot_dir,
'output_dir': deb_mock.config.output_dir,
'cache_dir': deb_mock.config.cache_dir,
})
# Import and initialize plugins
for plugin_name in self.plugins:
if self.plugin_conf.get(f"{plugin_name}_enable", True):
try:
self._load_plugin(plugin_name, deb_mock)
except Exception as e:
self.logger.error(f"Failed to load plugin {plugin_name}: {e}")
if self.plugin_conf.get(f"{plugin_name}_required", False):
raise PluginError(f"Required plugin {plugin_name} failed to load: {e}")
self.logger.info(f"Plugin initialization complete. Loaded {len(self._initialized_plugins)} plugins")
def _load_plugin(self, plugin_name: str, deb_mock):
"""Load and initialize a single plugin"""
self.logger.debug(f"Loading plugin: {plugin_name}")
# Find plugin module
spec = importlib.machinery.PathFinder.find_spec(plugin_name, [self.plugin_dir])
if not spec:
# Try to find in local plugins directory
local_plugin_dir = os.path.join(os.getcwd(), 'plugins')
spec = importlib.machinery.PathFinder.find_spec(plugin_name, [local_plugin_dir])
if not spec:
raise PluginError(f"Plugin {plugin_name} not found in {self.plugin_dir} or local plugins directory")
# Load plugin module
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
sys.modules[spec.name] = module
# Validate plugin API version
if not hasattr(module, 'requires_api_version'):
raise PluginError(f'Plugin "{plugin_name}" doesn\'t specify required API version')
requested_api_version = module.requires_api_version
if requested_api_version != self.CURRENT_API_VERSION:
raise PluginError(f'Plugin version mismatch - requested = {requested_api_version}, current = {self.CURRENT_API_VERSION}')
# Check if plugin should run in bootstrap chroots
run_in_bootstrap = getattr(module, "run_in_bootstrap", True)
# Initialize plugin
plugin_conf = self.plugin_conf.get(f"{plugin_name}_opts", {})
module.init(self, plugin_conf, deb_mock)
self._initialized_plugins.append(plugin_name)
self.logger.info(f"Plugin {plugin_name} loaded successfully")
def call_hooks(self, stage: str, *args, **kwargs):
"""Call all hooks registered for a specific stage"""
required = kwargs.pop('required', False)
hooks = self._hooks.get(stage, [])
if required and not hooks:
raise PluginError(f"Feature {stage} is not provided by any of enabled plugins")
self.logger.debug(f"Calling {len(hooks)} hooks for stage: {stage}")
for hook in hooks:
try:
hook(*args, **kwargs)
except Exception as e:
self.logger.error(f"Hook {hook.__name__} failed for stage {stage}: {e}")
if required:
raise PluginError(f"Required hook {hook.__name__} failed: {e}")
def add_hook(self, stage: str, function: Callable):
"""Add a hook function for a specific stage"""
if stage not in self._hooks:
self._hooks[stage] = []
if function not in self._hooks[stage]:
self._hooks[stage].append(function)
self.logger.debug(f"Added hook {function.__name__} for stage {stage}")
def remove_hook(self, stage: str, function: Callable):
"""Remove a hook function from a specific stage"""
if stage in self._hooks and function in self._hooks[stage]:
self._hooks[stage].remove(function)
self.logger.debug(f"Removed hook {function.__name__} from stage {stage}")
def get_hooks(self, stage: str) -> List[Callable]:
"""Get all hooks registered for a specific stage"""
return self._hooks.get(stage, [])
def list_stages(self) -> List[str]:
"""List all available hook stages"""
return list(self._hooks.keys())
def get_plugin_info(self) -> Dict[str, Any]:
"""Get information about loaded plugins"""
return {
'total_plugins': len(self.plugins),
'loaded_plugins': self._initialized_plugins,
'available_stages': self.list_stages(),
'plugin_dir': self.plugin_dir,
'api_version': self.CURRENT_API_VERSION
}
# Standard hook stages for deb-mock
class HookStages:
"""Standard hook stages for deb-mock plugins"""
# Chroot lifecycle
PRECHROOT_INIT = "prechroot_init"
POSTCHROOT_INIT = "postchroot_init"
PRECHROOT_CLEAN = "prechroot_clean"
POSTCHROOT_CLEAN = "postchroot_clean"
# Build lifecycle
PREBUILD = "prebuild"
POSTBUILD = "postbuild"
BUILD_START = "build_start"
BUILD_END = "build_end"
# Package management
PRE_INSTALL_DEPS = "pre_install_deps"
POST_INSTALL_DEPS = "post_install_deps"
PRE_INSTALL_PACKAGE = "pre_install_package"
POST_INSTALL_PACKAGE = "post_install_package"
# Mount management
PRE_MOUNT = "pre_mount"
POST_MOUNT = "post_mount"
PRE_UNMOUNT = "pre_unmount"
POST_UNMOUNT = "post_unmount"
# Cache management
PRE_CACHE_CREATE = "pre_cache_create"
POST_CACHE_CREATE = "post_cache_create"
PRE_CACHE_RESTORE = "pre_cache_restore"
POST_CACHE_RESTORE = "post_cache_restore"
# Parallel build hooks
PRE_PARALLEL_BUILD = "pre_parallel_build"
POST_PARALLEL_BUILD = "post_parallel_build"
PARALLEL_BUILD_START = "parallel_build_start"
PARALLEL_BUILD_END = "parallel_build_end"
# Error handling
ON_ERROR = "on_error"
ON_WARNING = "on_warning"
# Custom stages can be added by plugins
CUSTOM = "custom"
# Plugin base class for easier plugin development
class BasePlugin:
"""Base class for deb-mock plugins"""
def __init__(self, plugin_manager, config, deb_mock):
self.plugin_manager = plugin_manager
self.config = config
self.deb_mock = deb_mock
self.logger = logging.getLogger(f"deb_mock.plugin.{self.__class__.__name__}")
# Register hooks
self._register_hooks()
def _register_hooks(self):
"""Override this method to register hooks"""
pass
def get_config(self, key: str, default=None):
"""Get plugin configuration value"""
return self.config.get(key, default)
def set_config(self, key: str, value):
"""Set plugin configuration value"""
self.config[key] = value
def log_info(self, message: str):
"""Log info message"""
self.logger.info(message)
def log_warning(self, message: str):
"""Log warning message"""
self.logger.warning(message)
def log_error(self, message: str):
"""Log error message"""
self.logger.error(message)
def log_debug(self, message: str):
"""Log debug message"""
self.logger.debug(message)

View file

@ -5,6 +5,8 @@ sbuild wrapper for deb-mock
import os
import subprocess
import tempfile
import grp
import pwd
from pathlib import Path
from typing import Any, Dict, List
@ -16,6 +18,73 @@ class SbuildWrapper:
def __init__(self, config):
self.config = config
self._check_sbuild_requirements()
def _check_sbuild_requirements(self):
"""Check if sbuild requirements are met"""
# Check if sbuild is available
if not self._is_sbuild_available():
raise SbuildError("sbuild not found. Please install sbuild package.")
# Check if user is in sbuild group
if not self._is_user_in_sbuild_group():
raise SbuildError(
"User not in sbuild group. Please run 'sudo sbuild-adduser $USER' "
"and start a new shell session."
)
# Check if sbuild configuration exists
if not self._is_sbuild_configured():
self._setup_sbuild_config()
def _is_sbuild_available(self) -> bool:
"""Check if sbuild is available in PATH"""
try:
subprocess.run(["sbuild", "--version"], capture_output=True, check=True)
return True
except (subprocess.CalledProcessError, FileNotFoundError):
return False
def _is_user_in_sbuild_group(self) -> bool:
"""Check if current user is in sbuild group"""
try:
current_user = pwd.getpwuid(os.getuid()).pw_name
sbuild_group = grp.getgrnam("sbuild")
return current_user in sbuild_group.gr_mem
except (KeyError, OSError):
return False
def _is_sbuild_configured(self) -> bool:
"""Check if sbuild configuration exists"""
config_paths = [
os.path.expanduser("~/.config/sbuild/config.pl"),
os.path.expanduser("~/.sbuildrc"),
"/etc/sbuild/sbuild.conf"
]
return any(os.path.exists(path) for path in config_paths)
def _setup_sbuild_config(self):
"""Setup basic sbuild configuration"""
config_dir = os.path.expanduser("~/.config/sbuild")
config_file = os.path.join(config_dir, "config.pl")
try:
os.makedirs(config_dir, exist_ok=True)
# Create minimal config
config_content = """#!/usr/bin/perl
# deb-mock sbuild configuration
$chroot_mode = "schroot";
$schroot = "schroot";
"""
with open(config_file, "w") as f:
f.write(config_content)
os.chmod(config_file, 0o644)
except Exception as e:
raise SbuildError(f"Failed to create sbuild configuration: {e}")
def build_package(
self,
@ -39,6 +108,10 @@ class SbuildWrapper:
output_dir = os.path.join(tempfile.gettempdir(), "deb-mock-output")
os.makedirs(output_dir, exist_ok=True)
# Validate source package
if not self._is_valid_source_package(source_package):
raise SbuildError(f"Invalid source package: {source_package}")
# Prepare sbuild command
cmd = self._prepare_sbuild_command(source_package, chroot_name, output_dir, **kwargs)
@ -49,22 +122,35 @@ class SbuildWrapper:
env.update(self.config.build_env)
# Create temporary log file
with tempfile.NamedTemporaryFile(mode="w", suffix=".log", delete=False) as log_file:
log_path = log_file.name
with tempfile.NamedTemporaryFile(mode="w", suffix=".log", delete=False) as log_path:
log_file = log_path.name
try:
# Execute sbuild
result = self._execute_sbuild(cmd, log_path, env)
result = self._execute_sbuild(cmd, log_file, env)
# Parse build results
build_info = self._parse_build_results(output_dir, log_path, result)
build_info = self._parse_build_results(output_dir, log_file, result)
return build_info
finally:
# Clean up temporary log file
if os.path.exists(log_path):
os.unlink(log_path)
if os.path.exists(log_file):
os.unlink(log_file)
def _is_valid_source_package(self, source_package: str) -> bool:
"""Check if source package is valid"""
# Check if it's a directory with debian/control
if os.path.isdir(source_package):
control_file = os.path.join(source_package, "debian", "control")
return os.path.exists(control_file)
# Check if it's a .dsc file
if source_package.endswith(".dsc"):
return os.path.exists(source_package)
return False
def _prepare_sbuild_command(self, source_package: str, chroot_name: str, output_dir: str, **kwargs) -> List[str]:
"""Prepare the sbuild command with all necessary options"""
@ -95,9 +181,6 @@ class SbuildWrapper:
for option in kwargs["build_options"]:
cmd.extend(option.split())
# Environment variables will be passed to subprocess.run
pass
# Source package
cmd.append(source_package)
@ -288,3 +371,66 @@ class SbuildWrapper:
subprocess.run(cmd, check=True)
except subprocess.CalledProcessError as e:
raise SbuildError(f"Failed to install build dependencies: {e}")
def update_chroot(self, chroot_name: str = None) -> None:
"""Update the chroot to ensure it's current"""
if chroot_name is None:
chroot_name = self.config.chroot_name
try:
# Update package lists
cmd = ["schroot", "-c", chroot_name, "--", "apt-get", "update"]
subprocess.run(cmd, check=True)
# Upgrade packages
cmd = ["schroot", "-c", chroot_name, "--", "apt-get", "upgrade", "-y"]
subprocess.run(cmd, check=True)
except subprocess.CalledProcessError as e:
raise SbuildError(f"Failed to update chroot: {e}")
def get_chroot_info(self, chroot_name: str = None) -> Dict[str, Any]:
"""Get information about a chroot"""
if chroot_name is None:
chroot_name = self.config.chroot_name
info = {
"name": chroot_name,
"status": "unknown",
"architecture": None,
"distribution": None,
"packages": [],
}
try:
# Get chroot status
cmd = ["schroot", "-i", "-c", chroot_name]
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
# Parse schroot info output
for line in result.stdout.split("\n"):
if ":" in line:
key, value = line.split(":", 1)
key = key.strip()
value = value.strip()
if key == "Status":
info["status"] = value
elif key == "Architecture":
info["architecture"] = value
elif key == "Distribution":
info["distribution"] = value
# Get package count
cmd = ["schroot", "-c", chroot_name, "--", "dpkg", "-l", "|", "wc", "-l"]
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
if result.returncode == 0:
try:
info["package_count"] = int(result.stdout.strip())
except ValueError:
pass
except subprocess.CalledProcessError:
pass
return info

305
deb_mock/uid_manager.py Normal file
View file

@ -0,0 +1,305 @@
"""
UID/GID management for deb-mock
Based on Fedora Mock's UID management system
"""
import os
import grp
import pwd
import subprocess
import logging
from contextlib import contextmanager
from typing import Optional, Tuple, Dict, Any
from .exceptions import UIDManagerError
class UIDManager:
"""Manages UID/GID operations for deb-mock chroots"""
def __init__(self, config):
self.config = config
self.logger = logging.getLogger(__name__)
# Default user/group configuration
self.chroot_user = getattr(config, 'chroot_user', 'build')
self.chroot_group = getattr(config, 'chroot_group', 'build')
self.chroot_uid = getattr(config, 'chroot_uid', 1000)
self.chroot_gid = getattr(config, 'chroot_gid', 1000)
# Current user information
self.current_uid = os.getuid()
self.current_gid = os.getgid()
self.current_user = pwd.getpwuid(self.current_uid).pw_name
# Privilege stack for context management
self._privilege_stack = []
self._environment_stack = []
# Validate configuration
self._validate_config()
def _validate_config(self):
"""Validate UID/GID configuration"""
try:
# Check if chroot user/group exist on host
if hasattr(self.config, 'use_host_user') and self.config.use_host_user:
try:
pwd.getpwnam(self.chroot_user)
grp.getgrnam(self.chroot_group)
except KeyError as e:
self.logger.warning(f"Host user/group not found: {e}")
# Validate UID/GID ranges
if self.chroot_uid < 1000:
self.logger.warning(f"Chroot UID {self.chroot_uid} is below 1000")
if self.chroot_gid < 1000:
self.logger.warning(f"Chroot GID {self.chroot_gid} is below 1000")
except Exception as e:
raise UIDManagerError(f"UID configuration validation failed: {e}")
@contextmanager
def elevated_privileges(self):
"""Context manager for elevated privileges"""
self._push_privileges()
self._elevate_privileges()
try:
yield
finally:
self._restore_privileges()
def _push_privileges(self):
"""Save current privilege state"""
self._privilege_stack.append({
'ruid': os.getuid(),
'euid': os.geteuid(),
'rgid': os.getgid(),
'egid': os.getegid(),
})
self._environment_stack.append(dict(os.environ))
def _elevate_privileges(self):
"""Elevate to root privileges"""
try:
os.setregid(0, 0)
os.setreuid(0, 0)
except PermissionError:
raise UIDManagerError("Failed to elevate privileges - requires root access")
def _restore_privileges(self):
"""Restore previous privilege state"""
if not self._privilege_stack:
return
privs = self._privilege_stack.pop()
env = self._environment_stack.pop()
# Restore environment
os.environ.clear()
os.environ.update(env)
# Restore UID/GID
os.setregid(privs['rgid'], privs['egid'])
os.setreuid(privs['ruid'], privs['euid'])
def become_user(self, uid: int, gid: Optional[int] = None) -> None:
"""Become a specific user/group"""
if gid is None:
gid = uid
self._push_privileges()
self._elevate_privileges()
os.setregid(gid, gid)
os.setreuid(uid, uid)
def restore_privileges(self) -> None:
"""Restore previous privilege state"""
self._restore_privileges()
def change_owner(self, path: str, uid: Optional[int] = None, gid: Optional[int] = None, recursive: bool = False) -> None:
"""Change ownership of files/directories"""
if uid is None:
uid = self.chroot_uid
if gid is None:
gid = self.chroot_gid
with self.elevated_privileges():
self._tolerant_chown(path, uid, gid)
if recursive:
for root, dirs, files in os.walk(path):
for d in dirs:
self._tolerant_chown(os.path.join(root, d), uid, gid)
for f in files:
self._tolerant_chown(os.path.join(root, f), uid, gid)
def _tolerant_chown(self, path: str, uid: int, gid: int) -> None:
"""Change ownership without raising errors for missing files"""
try:
os.lchown(path, uid, gid)
except OSError as e:
if e.errno != 2: # ENOENT - No such file or directory
self.logger.warning(f"Failed to change ownership of {path}: {e}")
def create_chroot_user(self, chroot_path: str) -> None:
"""Create the build user in the chroot"""
with self.elevated_privileges():
try:
# Create group first
self._create_group_in_chroot(chroot_path, self.chroot_group, self.chroot_gid)
# Create user
self._create_user_in_chroot(chroot_path, self.chroot_user, self.chroot_uid, self.chroot_gid)
# Setup home directory
self._setup_home_directory(chroot_path)
self.logger.info(f"Created chroot user {self.chroot_user} (UID: {self.chroot_uid}, GID: {self.chroot_gid})")
except Exception as e:
raise UIDManagerError(f"Failed to create chroot user: {e}")
def _create_group_in_chroot(self, chroot_path: str, group_name: str, gid: int) -> None:
"""Create a group in the chroot"""
group_file = os.path.join(chroot_path, 'etc', 'group')
# Check if group already exists
if os.path.exists(group_file):
with open(group_file, 'r') as f:
for line in f:
if line.startswith(f"{group_name}:"):
return # Group already exists
# Create group entry
group_entry = f"{group_name}:x:{gid}:\n"
# Ensure /etc directory exists
os.makedirs(os.path.dirname(group_file), exist_ok=True)
# Append to group file
with open(group_file, 'a') as f:
f.write(group_entry)
def _create_user_in_chroot(self, chroot_path: str, username: str, uid: int, gid: int) -> None:
"""Create a user in the chroot"""
passwd_file = os.path.join(chroot_path, 'etc', 'passwd')
home_dir = os.path.join(chroot_path, 'home', username)
# Check if user already exists
if os.path.exists(passwd_file):
with open(passwd_file, 'r') as f:
for line in f:
if line.startswith(f"{username}:"):
return # User already exists
# Create user entry
user_entry = f"{username}:x:{uid}:{gid}:Build User:/home/{username}:/bin/bash\n"
# Ensure /etc directory exists
os.makedirs(os.path.dirname(passwd_file), exist_ok=True)
# Append to passwd file
with open(passwd_file, 'a') as f:
f.write(user_entry)
def _setup_home_directory(self, chroot_path: str) -> None:
"""Setup home directory for the build user"""
home_dir = os.path.join(chroot_path, 'home', self.chroot_user)
# Create home directory
os.makedirs(home_dir, exist_ok=True)
# Set ownership
self._tolerant_chown(home_dir, self.chroot_uid, self.chroot_gid)
# Set permissions
os.chmod(home_dir, 0o755)
def copy_host_user(self, chroot_path: str, username: str) -> None:
"""Copy a user from the host system to the chroot"""
try:
# Get user info from host
user_info = pwd.getpwnam(username)
uid = user_info.pw_uid
gid = user_info.pw_gid
# Get group info
group_info = grp.getgrgid(gid)
group_name = group_info.gr_name
# Create in chroot
self._create_group_in_chroot(chroot_path, group_name, gid)
self._create_user_in_chroot(chroot_path, username, uid, gid)
self.logger.info(f"Copied host user {username} (UID: {uid}, GID: {gid}) to chroot")
except KeyError as e:
raise UIDManagerError(f"Host user {username} not found: {e}")
except Exception as e:
raise UIDManagerError(f"Failed to copy host user {username}: {e}")
def setup_chroot_permissions(self, chroot_path: str) -> None:
"""Setup proper permissions for the chroot"""
with self.elevated_privileges():
try:
# Change ownership of key directories
key_dirs = [
'home',
'tmp',
'var/tmp',
'var/cache',
'var/log'
]
for dir_name in key_dirs:
dir_path = os.path.join(chroot_path, dir_name)
if os.path.exists(dir_path):
self._tolerant_chown(dir_path, self.chroot_uid, self.chroot_gid)
# Ensure proper permissions on /tmp
tmp_path = os.path.join(chroot_path, 'tmp')
if os.path.exists(tmp_path):
os.chmod(tmp_path, 0o1777)
self.logger.info("Chroot permissions setup complete")
except Exception as e:
raise UIDManagerError(f"Failed to setup chroot permissions: {e}")
def get_user_info(self) -> Dict[str, Any]:
"""Get current user information"""
return {
'current_uid': self.current_uid,
'current_gid': self.current_gid,
'current_user': self.current_user,
'chroot_user': self.chroot_user,
'chroot_group': self.chroot_group,
'chroot_uid': self.chroot_uid,
'chroot_gid': self.chroot_gid
}
def validate_chroot_user(self, chroot_path: str) -> bool:
"""Validate that the chroot user exists and is properly configured"""
passwd_file = os.path.join(chroot_path, 'etc', 'passwd')
group_file = os.path.join(chroot_path, 'etc', 'group')
if not os.path.exists(passwd_file) or not os.path.exists(group_file):
return False
# Check if user exists
user_exists = False
group_exists = False
with open(passwd_file, 'r') as f:
for line in f:
if line.startswith(f"{self.chroot_user}:"):
user_exists = True
break
with open(group_file, 'r') as f:
for line in f:
if line.startswith(f"{self.chroot_group}:"):
group_exists = True
break
return user_exists and group_exists