deb-mock/deb_mock/benchmarking.py
robojerk c51819c836
Some checks failed
Build Deb-Mock Package / build (push) Failing after 1m9s
Lint Code / Lint All Code (push) Failing after 1s
Test Deb-Mock Build / test (push) Failing after 35s
Add comprehensive testing framework, performance monitoring, and plugin system
- Add complete pytest testing framework with conftest.py and test files
- Add performance monitoring and benchmarking capabilities
- Add plugin system with ccache plugin example
- Add comprehensive documentation (API, deployment, testing, etc.)
- Add Docker API wrapper for service deployment
- Add advanced configuration examples
- Remove old wget package file
- Update core modules with enhanced functionality
2025-08-19 20:49:32 -07:00

778 lines
31 KiB
Python

"""
Advanced benchmarking system for deb-mock
"""
import time
import psutil
import threading
import json
import os
import statistics
import subprocess
from pathlib import Path
from typing import Dict, List, Any, Optional, Callable, Tuple
from contextlib import contextmanager
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
import multiprocessing
from .exceptions import PerformanceError
@dataclass
class BenchmarkConfig:
"""Configuration for benchmarking"""
name: str
description: str
iterations: int
warmup_iterations: int
parallel_runs: int
timeout_seconds: int
collect_system_metrics: bool
collect_detailed_metrics: bool
output_format: str # json, html, csv
output_file: Optional[str]
@dataclass
class BenchmarkMetrics:
"""Metrics collected during benchmarking"""
timestamp: datetime
duration: float
cpu_percent: float
memory_mb: float
disk_io_read_mb: float
disk_io_write_mb: float
network_io_mb: float
chroot_size_mb: float
cache_hit_rate: float
parallel_efficiency: float
resource_utilization: float
# System-level metrics
system_cpu_percent: float
system_memory_percent: float
system_load_average: Tuple[float, float, float]
system_disk_usage_percent: float
system_network_connections: int
@dataclass
class BenchmarkResult:
"""Result of a benchmark run"""
benchmark_name: str
config: BenchmarkConfig
start_time: datetime
end_time: datetime
total_duration: float
iterations: int
successful_iterations: int
failed_iterations: int
# Performance statistics
durations: List[float]
average_duration: float
min_duration: float
max_duration: float
median_duration: float
standard_deviation: float
coefficient_of_variation: float
# Percentiles
percentiles: Dict[str, float]
# System impact
system_impact: Dict[str, float]
# Detailed metrics
metrics: List[BenchmarkMetrics]
# Analysis
analysis: Dict[str, Any]
recommendations: List[str]
# Metadata
system_info: Dict[str, Any]
benchmark_version: str
class BenchmarkRunner:
"""Advanced benchmark runner for deb-mock operations"""
def __init__(self, config):
self.config = config
self.logger = logging.getLogger(__name__)
# Benchmark history
self._benchmark_history = []
self._benchmark_results = {}
# System information
self._system_info = self._collect_system_info()
# Benchmark templates
self._benchmark_templates = self._load_benchmark_templates()
# Performance baselines
self._performance_baselines = {}
self._load_performance_baselines()
def _collect_system_info(self) -> Dict[str, Any]:
"""Collect comprehensive system information"""
try:
# CPU information
cpu_info = {
"count": psutil.cpu_count(),
"count_logical": psutil.cpu_count(logical=True),
"freq": psutil.cpu_freq()._asdict() if psutil.cpu_freq() else None,
"architecture": os.uname().machine if hasattr(os, 'uname') else "unknown"
}
# Memory information
memory = psutil.virtual_memory()
memory_info = {
"total_gb": memory.total / (1024**3),
"available_gb": memory.available / (1024**3),
"percent": memory.percent
}
# Disk information
disk = psutil.disk_usage('/')
disk_info = {
"total_gb": disk.total / (1024**3),
"free_gb": disk.free / (1024**3),
"percent": disk.percent
}
# OS information
os_info = {
"platform": os.uname().sysname if hasattr(os, 'uname') else "unknown",
"release": os.uname().release if hasattr(os, 'uname') else "unknown",
"version": os.uname().version if hasattr(os, 'uname') else "unknown"
}
# Python information
python_info = {
"version": f"{os.sys.version_info.major}.{os.sys.version_info.minor}.{os.sys.version_info.micro}",
"implementation": os.sys.implementation.name,
"platform": os.sys.platform
}
return {
"cpu": cpu_info,
"memory": memory_info,
"disk": disk_info,
"os": os_info,
"python": python_info,
"timestamp": datetime.now().isoformat()
}
except Exception as e:
self.logger.error(f"Failed to collect system info: {e}")
return {"error": str(e)}
def _load_benchmark_templates(self) -> Dict[str, BenchmarkConfig]:
"""Load predefined benchmark templates"""
templates = {
"quick": BenchmarkConfig(
name="Quick Benchmark",
description="Fast benchmark with minimal iterations",
iterations=5,
warmup_iterations=1,
parallel_runs=1,
timeout_seconds=300,
collect_system_metrics=True,
collect_detailed_metrics=False,
output_format="json",
output_file=None
),
"standard": BenchmarkConfig(
name="Standard Benchmark",
description="Standard benchmark with moderate iterations",
iterations=20,
warmup_iterations=3,
parallel_runs=2,
timeout_seconds=600,
collect_system_metrics=True,
collect_detailed_metrics=True,
output_format="html",
output_file=None
),
"comprehensive": BenchmarkConfig(
name="Comprehensive Benchmark",
description="Comprehensive benchmark with many iterations",
iterations=100,
warmup_iterations=10,
parallel_runs=4,
timeout_seconds=1800,
collect_system_metrics=True,
collect_detailed_metrics=True,
output_format="html",
output_file=None
),
"stress": BenchmarkConfig(
name="Stress Test",
description="Stress test with high load",
iterations=50,
warmup_iterations=5,
parallel_runs=8,
timeout_seconds=1200,
collect_system_metrics=True,
collect_detailed_metrics=True,
output_format="json",
output_file=None
)
}
return templates
def _load_performance_baselines(self):
"""Load performance baselines for comparison"""
baseline_file = os.path.join(getattr(self.config, 'performance_metrics_dir', './performance-metrics'), "baselines.json")
if os.path.exists(baseline_file):
try:
with open(baseline_file, 'r') as f:
self._performance_baselines = json.load(f)
self.logger.info("Loaded performance baselines for benchmarking")
except Exception as e:
self.logger.warning(f"Failed to load baselines: {e}")
def run_benchmark(self, benchmark_name: str, operation_func: Callable,
operation_args: Tuple = (), operation_kwargs: Dict = None,
config: Optional[BenchmarkConfig] = None) -> BenchmarkResult:
"""Run a benchmark for a specific operation"""
if operation_kwargs is None:
operation_kwargs = {}
# Use template if no config provided
if config is None:
if benchmark_name in self._benchmark_templates:
config = self._benchmark_templates[benchmark_name]
else:
config = self._benchmark_templates["standard"]
self.logger.info(f"Starting benchmark: {benchmark_name}")
self.logger.info(f"Configuration: {iterations} iterations, {parallel_runs} parallel runs")
start_time = datetime.now()
results = []
metrics_list = []
# Warmup runs
if config.warmup_iterations > 0:
self.logger.info(f"Running {config.warmup_iterations} warmup iterations")
for i in range(config.warmup_iterations):
try:
operation_func(*operation_args, **operation_kwargs)
except Exception as e:
self.logger.warning(f"Warmup iteration {i+1} failed: {e}")
# Main benchmark runs
self.logger.info(f"Running {config.iterations} benchmark iterations")
if config.parallel_runs > 1:
results = self._run_parallel_benchmark(operation_func, operation_args, operation_kwargs, config)
else:
results = self._run_sequential_benchmark(operation_func, operation_args, operation_kwargs, config)
# Collect system metrics if enabled
if config.collect_system_metrics:
metrics_list = self._collect_benchmark_metrics(results, config)
# Calculate statistics
durations = [r["duration"] for r in results if r["success"]]
successful_iterations = len(durations)
failed_iterations = len(results) - successful_iterations
if not durations:
raise PerformanceError("No successful benchmark iterations")
# Calculate performance statistics
stats = self._calculate_performance_statistics(durations)
# Calculate system impact
system_impact = self._calculate_system_impact(metrics_list) if metrics_list else {}
# Generate analysis and recommendations
analysis = self._analyze_benchmark_results(stats, system_impact)
recommendations = self._generate_benchmark_recommendations(analysis, stats)
# Create benchmark result
end_time = datetime.now()
total_duration = (end_time - start_time).total_seconds()
benchmark_result = BenchmarkResult(
benchmark_name=benchmark_name,
config=config,
start_time=start_time,
end_time=end_time,
total_duration=total_duration,
iterations=config.iterations,
successful_iterations=successful_iterations,
failed_iterations=failed_iterations,
durations=durations,
average_duration=stats["average"],
min_duration=stats["min"],
max_duration=stats["max"],
median_duration=stats["median"],
standard_deviation=stats["std_dev"],
coefficient_of_variation=stats["cv"],
percentiles=stats["percentiles"],
system_impact=system_impact,
metrics=metrics_list,
analysis=analysis,
recommendations=recommendations,
system_info=self._system_info,
benchmark_version="1.0.0"
)
# Store result
self._benchmark_results[benchmark_name] = benchmark_result
self._benchmark_history.append(benchmark_result)
# Save result
self._save_benchmark_result(benchmark_result)
self.logger.info(f"Benchmark completed: {benchmark_name}")
self.logger.info(f"Results: {successful_iterations}/{config.iterations} successful, "
f"avg duration: {stats['average']:.3f}s")
return benchmark_result
def _run_sequential_benchmark(self, operation_func: Callable, operation_args: Tuple,
operation_kwargs: Dict, config: BenchmarkConfig) -> List[Dict[str, Any]]:
"""Run benchmark iterations sequentially"""
results = []
for i in range(config.iterations):
self.logger.debug(f"Running iteration {i+1}/{config.iterations}")
try:
start_time = time.time()
result = operation_func(*operation_args, **operation_kwargs)
end_time = time.time()
iteration_result = {
"iteration": i + 1,
"success": True,
"duration": end_time - start_time,
"result": result,
"timestamp": datetime.now()
}
results.append(iteration_result)
except Exception as e:
self.logger.warning(f"Iteration {i+1} failed: {e}")
iteration_result = {
"iteration": i + 1,
"success": False,
"duration": 0,
"error": str(e),
"timestamp": datetime.now()
}
results.append(iteration_result)
return results
def _run_parallel_benchmark(self, operation_func: Callable, operation_args: Tuple,
operation_kwargs: Dict, config: BenchmarkConfig) -> List[Dict[str, Any]]:
"""Run benchmark iterations in parallel"""
results = []
def run_iteration(iteration_num):
try:
start_time = time.time()
result = operation_func(*operation_args, **operation_kwargs)
end_time = time.time()
return {
"iteration": iteration_num,
"success": True,
"duration": end_time - start_time,
"result": result,
"timestamp": datetime.now()
}
except Exception as e:
self.logger.warning(f"Iteration {iteration_num} failed: {e}")
return {
"iteration": iteration_num,
"success": False,
"duration": 0,
"error": str(e),
"timestamp": datetime.now()
}
# Use ThreadPoolExecutor for parallel execution
with ThreadPoolExecutor(max_workers=config.parallel_runs) as executor:
future_to_iteration = {
executor.submit(run_iteration, i + 1): i + 1
for i in range(config.iterations)
}
for future in as_completed(future_to_iteration):
result = future.result()
results.append(result)
# Sort results by iteration number
results.sort(key=lambda x: x["iteration"])
return results
def _collect_benchmark_metrics(self, results: List[Dict[str, Any]],
config: BenchmarkConfig) -> List[BenchmarkMetrics]:
"""Collect system metrics during benchmarking"""
metrics_list = []
for result in results:
if not result["success"]:
continue
try:
# Collect system metrics
cpu_percent = psutil.cpu_percent(interval=0.1)
memory = psutil.virtual_memory()
disk_io = psutil.disk_io_counters()
net_io = psutil.net_io_counters()
# Get load average if available
try:
load_avg = os.getloadavg()
except (OSError, AttributeError):
load_avg = (0.0, 0.0, 0.0)
# Get disk usage
disk_usage = psutil.disk_usage('/')
# Get network connections count
try:
net_connections = len(psutil.net_connections())
except (OSError, psutil.AccessDenied):
net_connections = 0
metrics = BenchmarkMetrics(
timestamp=result["timestamp"],
duration=result["duration"],
cpu_percent=cpu_percent,
memory_mb=memory.used / (1024 * 1024),
disk_io_read_mb=disk_io.read_bytes / (1024 * 1024) if disk_io else 0,
disk_io_write_mb=disk_io.write_bytes / (1024 * 1024) if disk_io else 0,
network_io_mb=(net_io.bytes_sent + net_io.bytes_recv) / (1024 * 1024) if net_io else 0,
chroot_size_mb=0, # Would need to be calculated from actual chroot
cache_hit_rate=0.0, # Would need to be calculated from cache metrics
parallel_efficiency=1.0, # Would need to be calculated
resource_utilization=0.0, # Would need to be calculated
system_cpu_percent=cpu_percent,
system_memory_percent=memory.percent,
system_load_average=load_avg,
system_disk_usage_percent=disk_usage.percent,
system_network_connections=net_connections
)
metrics_list.append(metrics)
except Exception as e:
self.logger.warning(f"Failed to collect metrics for iteration {result['iteration']}: {e}")
return metrics_list
def _calculate_performance_statistics(self, durations: List[float]) -> Dict[str, Any]:
"""Calculate comprehensive performance statistics"""
if not durations:
return {}
# Basic statistics
avg_duration = statistics.mean(durations)
min_duration = min(durations)
max_duration = max(durations)
median_duration = statistics.median(durations)
# Standard deviation and coefficient of variation
try:
std_dev = statistics.stdev(durations)
cv = std_dev / avg_duration if avg_duration > 0 else 0
except statistics.StatisticsError:
std_dev = 0
cv = 0
# Percentiles
sorted_durations = sorted(durations)
percentiles = {
"p10": sorted_durations[int(0.1 * len(sorted_durations))],
"p25": sorted_durations[int(0.25 * len(sorted_durations))],
"p50": sorted_durations[int(0.5 * len(sorted_durations))],
"p75": sorted_durations[int(0.75 * len(sorted_durations))],
"p90": sorted_durations[int(0.9 * len(sorted_durations))],
"p95": sorted_durations[int(0.95 * len(sorted_durations))],
"p99": sorted_durations[int(0.99 * len(sorted_durations))]
}
return {
"average": avg_duration,
"min": min_duration,
"max": max_duration,
"median": median_duration,
"std_dev": std_dev,
"cv": cv,
"percentiles": percentiles
}
def _calculate_system_impact(self, metrics_list: List[BenchmarkMetrics]) -> Dict[str, float]:
"""Calculate system impact during benchmarking"""
if not metrics_list:
return {}
# Calculate averages across all metrics
avg_cpu = statistics.mean(m.cpu_percent for m in metrics_list)
avg_memory = statistics.mean(m.memory_mb for m in metrics_list)
avg_disk_read = statistics.mean(m.disk_io_read_mb for m in metrics_list)
avg_disk_write = statistics.mean(m.disk_io_write_mb for m in metrics_list)
avg_network = statistics.mean(m.network_io_mb for m in metrics_list)
# Calculate peak values
peak_cpu = max(m.cpu_percent for m in metrics_list)
peak_memory = max(m.memory_mb for m in metrics_list)
return {
"avg_cpu_percent": avg_cpu,
"avg_memory_mb": avg_memory,
"avg_disk_read_mb": avg_disk_read,
"avg_disk_write_mb": avg_disk_write,
"avg_network_mb": avg_network,
"peak_cpu_percent": peak_cpu,
"peak_memory_mb": peak_memory
}
def _analyze_benchmark_results(self, stats: Dict[str, Any],
system_impact: Dict[str, float]) -> Dict[str, Any]:
"""Analyze benchmark results for insights"""
analysis = {
"performance_stability": "unknown",
"system_impact_level": "unknown",
"optimization_opportunities": [],
"anomalies": []
}
# Analyze performance stability
cv = stats.get("cv", 0)
if cv < 0.1:
analysis["performance_stability"] = "excellent"
elif cv < 0.2:
analysis["performance_stability"] = "good"
elif cv < 0.3:
analysis["performance_stability"] = "fair"
else:
analysis["performance_stability"] = "poor"
analysis["optimization_opportunities"].append("High performance variability detected")
# Analyze system impact
avg_cpu = system_impact.get("avg_cpu_percent", 0)
avg_memory = system_impact.get("avg_memory_mb", 0)
if avg_cpu < 30:
analysis["system_impact_level"] = "low"
analysis["optimization_opportunities"].append("CPU utilization is low, consider increasing parallelization")
elif avg_cpu < 70:
analysis["system_impact_level"] = "moderate"
else:
analysis["system_impact_level"] = "high"
analysis["optimization_opportunities"].append("High CPU utilization, consider reducing load")
if avg_memory > 2048: # 2GB
analysis["optimization_opportunities"].append("High memory usage, consider optimizing memory allocation")
# Detect anomalies
durations = stats.get("durations", [])
if durations:
avg_duration = stats.get("average", 0)
for duration in durations:
if abs(duration - avg_duration) > 2 * stats.get("std_dev", 0):
analysis["anomalies"].append(f"Duration anomaly: {duration:.3f}s (avg: {avg_duration:.3f}s)")
return analysis
def _generate_benchmark_recommendations(self, analysis: Dict[str, Any],
stats: Dict[str, Any]) -> List[str]:
"""Generate actionable recommendations based on benchmark results"""
recommendations = []
# Performance stability recommendations
stability = analysis.get("performance_stability", "unknown")
if stability in ["fair", "poor"]:
recommendations.append("Investigate performance variability - check for external factors affecting performance")
recommendations.append("Consider running more iterations to get more stable results")
# System impact recommendations
impact_level = analysis.get("system_impact_level", "unknown")
if impact_level == "low":
recommendations.append("System resources are underutilized - consider increasing workload or parallelization")
elif impact_level == "high":
recommendations.append("System is under high load - consider reducing workload or optimizing operations")
# Optimization recommendations
for opportunity in analysis.get("optimization_opportunities", []):
recommendations.append(opportunity)
# General recommendations
if stats.get("cv", 0) > 0.2:
recommendations.append("High coefficient of variation suggests inconsistent performance - investigate root causes")
if len(recommendations) == 0:
recommendations.append("Performance is within acceptable parameters - continue monitoring")
return recommendations
def _save_benchmark_result(self, result: BenchmarkResult):
"""Save benchmark result to file"""
try:
metrics_dir = getattr(self.config, 'performance_metrics_dir', './performance-metrics')
os.makedirs(metrics_dir, exist_ok=True)
timestamp = result.start_time.strftime("%Y%m%d_%H%M%S")
filename = f"benchmark_{result.benchmark_name}_{timestamp}.json"
filepath = os.path.join(metrics_dir, filename)
# Convert to dict for JSON serialization
result_dict = asdict(result)
result_dict["start_time"] = result.start_time.isoformat()
result_dict["end_time"] = result.end_time.isoformat()
result_dict["timestamp"] = result.timestamp.isoformat()
with open(filepath, 'w') as f:
json.dump(result_dict, f, indent=2, default=str)
self.logger.info(f"Benchmark result saved: {filepath}")
except Exception as e:
self.logger.error(f"Failed to save benchmark result: {e}")
def compare_benchmarks(self, benchmark_names: List[str]) -> Dict[str, Any]:
"""Compare multiple benchmark results"""
if len(benchmark_names) < 2:
raise ValueError("Need at least 2 benchmark names for comparison")
comparison = {
"benchmarks": benchmark_names,
"comparison_date": datetime.now().isoformat(),
"results": {},
"analysis": {},
"recommendations": []
}
# Collect benchmark results
for name in benchmark_names:
if name in self._benchmark_results:
result = self._benchmark_results[name]
comparison["results"][name] = {
"average_duration": result.average_duration,
"min_duration": result.min_duration,
"max_duration": result.max_duration,
"standard_deviation": result.standard_deviation,
"coefficient_of_variation": result.coefficient_of_variation,
"successful_iterations": result.successful_iterations,
"total_iterations": result.iterations
}
# Perform comparison analysis
if len(comparison["results"]) >= 2:
comparison["analysis"] = self._analyze_benchmark_comparison(comparison["results"])
comparison["recommendations"] = self._generate_comparison_recommendations(comparison["analysis"])
return comparison
def _analyze_benchmark_comparison(self, results: Dict[str, Any]) -> Dict[str, Any]:
"""Analyze comparison between benchmark results"""
analysis = {
"fastest_benchmark": None,
"slowest_benchmark": None,
"most_stable_benchmark": None,
"least_stable_benchmark": None,
"performance_differences": {},
"stability_differences": {}
}
if len(results) < 2:
return analysis
# Find fastest and slowest
avg_durations = {name: data["average_duration"] for name, data in results.items()}
fastest = min(avg_durations, key=avg_durations.get)
slowest = max(avg_durations, key=avg_durations.get)
analysis["fastest_benchmark"] = fastest
analysis["slowest_benchmark"] = slowest
# Find most and least stable
cv_values = {name: data["coefficient_of_variation"] for name, data in results.items()}
most_stable = min(cv_values, key=cv_values.get)
least_stable = max(cv_values, key=cv_values.get)
analysis["most_stable_benchmark"] = most_stable
analysis["least_stable_benchmark"] = least_stable
# Calculate performance differences
fastest_avg = avg_durations[fastest]
for name, data in results.items():
if name != fastest:
diff_percent = ((data["average_duration"] - fastest_avg) / fastest_avg) * 100
analysis["performance_differences"][name] = {
"vs_fastest_percent": diff_percent,
"vs_fastest_seconds": data["average_duration"] - fastest_avg
}
# Calculate stability differences
most_stable_cv = cv_values[most_stable]
for name, data in results.items():
if name != most_stable:
cv_diff = data["coefficient_of_variation"] - most_stable_cv
analysis["stability_differences"][name] = {
"vs_most_stable_cv": cv_diff,
"stability_ratio": data["coefficient_of_variation"] / most_stable_cv
}
return analysis
def _generate_comparison_recommendations(self, analysis: Dict[str, Any]) -> List[str]:
"""Generate recommendations based on benchmark comparison"""
recommendations = []
fastest = analysis.get("fastest_benchmark")
slowest = analysis.get("slowest_benchmark")
most_stable = analysis.get("most_stable_benchmark")
least_stable = analysis.get("least_stable_benchmark")
if fastest and slowest and fastest != slowest:
fastest_avg = analysis["performance_differences"][slowest]["vs_fastest_percent"]
recommendations.append(f"Benchmark '{slowest}' is {fastest_avg:.1f}% slower than '{fastest}' - investigate performance differences")
if most_stable and least_stable and most_stable != least_stable:
stability_ratio = analysis["stability_differences"][least_stable]["stability_ratio"]
recommendations.append(f"Benchmark '{least_stable}' is {stability_ratio:.2f}x less stable than '{most_stable}' - investigate variability causes")
# General recommendations
if len(analysis.get("performance_differences", {})) > 0:
recommendations.append("Consider using the fastest benchmark configuration for production")
if len(analysis.get("stability_differences", {})) > 0:
recommendations.append("Consider using the most stable benchmark configuration for critical operations")
return recommendations
def list_benchmarks(self) -> List[str]:
"""List all available benchmark templates"""
return list(self._benchmark_templates.keys())
def get_benchmark_result(self, benchmark_name: str) -> Optional[BenchmarkResult]:
"""Get a specific benchmark result"""
return self._benchmark_results.get(benchmark_name)
def get_benchmark_history(self) -> List[BenchmarkResult]:
"""Get all benchmark results"""
return self._benchmark_history.copy()
def clear_benchmark_history(self):
"""Clear benchmark history"""
self._benchmark_history.clear()
self._benchmark_results.clear()
self.logger.info("Benchmark history cleared")