- Add complete pytest testing framework with conftest.py and test files - Add performance monitoring and benchmarking capabilities - Add plugin system with ccache plugin example - Add comprehensive documentation (API, deployment, testing, etc.) - Add Docker API wrapper for service deployment - Add advanced configuration examples - Remove old wget package file - Update core modules with enhanced functionality
778 lines
31 KiB
Python
778 lines
31 KiB
Python
"""
|
|
Advanced benchmarking system for deb-mock
|
|
"""
|
|
|
|
import time
|
|
import psutil
|
|
import threading
|
|
import json
|
|
import os
|
|
import statistics
|
|
import subprocess
|
|
from pathlib import Path
|
|
from typing import Dict, List, Any, Optional, Callable, Tuple
|
|
from contextlib import contextmanager
|
|
from dataclasses import dataclass, asdict
|
|
from datetime import datetime, timedelta
|
|
import logging
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
import multiprocessing
|
|
|
|
from .exceptions import PerformanceError
|
|
|
|
|
|
@dataclass
|
|
class BenchmarkConfig:
|
|
"""Configuration for benchmarking"""
|
|
name: str
|
|
description: str
|
|
iterations: int
|
|
warmup_iterations: int
|
|
parallel_runs: int
|
|
timeout_seconds: int
|
|
collect_system_metrics: bool
|
|
collect_detailed_metrics: bool
|
|
output_format: str # json, html, csv
|
|
output_file: Optional[str]
|
|
|
|
|
|
@dataclass
|
|
class BenchmarkMetrics:
|
|
"""Metrics collected during benchmarking"""
|
|
timestamp: datetime
|
|
duration: float
|
|
cpu_percent: float
|
|
memory_mb: float
|
|
disk_io_read_mb: float
|
|
disk_io_write_mb: float
|
|
network_io_mb: float
|
|
chroot_size_mb: float
|
|
cache_hit_rate: float
|
|
parallel_efficiency: float
|
|
resource_utilization: float
|
|
# System-level metrics
|
|
system_cpu_percent: float
|
|
system_memory_percent: float
|
|
system_load_average: Tuple[float, float, float]
|
|
system_disk_usage_percent: float
|
|
system_network_connections: int
|
|
|
|
|
|
@dataclass
|
|
class BenchmarkResult:
|
|
"""Result of a benchmark run"""
|
|
benchmark_name: str
|
|
config: BenchmarkConfig
|
|
start_time: datetime
|
|
end_time: datetime
|
|
total_duration: float
|
|
iterations: int
|
|
successful_iterations: int
|
|
failed_iterations: int
|
|
|
|
# Performance statistics
|
|
durations: List[float]
|
|
average_duration: float
|
|
min_duration: float
|
|
max_duration: float
|
|
median_duration: float
|
|
standard_deviation: float
|
|
coefficient_of_variation: float
|
|
|
|
# Percentiles
|
|
percentiles: Dict[str, float]
|
|
|
|
# System impact
|
|
system_impact: Dict[str, float]
|
|
|
|
# Detailed metrics
|
|
metrics: List[BenchmarkMetrics]
|
|
|
|
# Analysis
|
|
analysis: Dict[str, Any]
|
|
recommendations: List[str]
|
|
|
|
# Metadata
|
|
system_info: Dict[str, Any]
|
|
benchmark_version: str
|
|
|
|
|
|
class BenchmarkRunner:
|
|
"""Advanced benchmark runner for deb-mock operations"""
|
|
|
|
def __init__(self, config):
|
|
self.config = config
|
|
self.logger = logging.getLogger(__name__)
|
|
|
|
# Benchmark history
|
|
self._benchmark_history = []
|
|
self._benchmark_results = {}
|
|
|
|
# System information
|
|
self._system_info = self._collect_system_info()
|
|
|
|
# Benchmark templates
|
|
self._benchmark_templates = self._load_benchmark_templates()
|
|
|
|
# Performance baselines
|
|
self._performance_baselines = {}
|
|
self._load_performance_baselines()
|
|
|
|
def _collect_system_info(self) -> Dict[str, Any]:
|
|
"""Collect comprehensive system information"""
|
|
try:
|
|
# CPU information
|
|
cpu_info = {
|
|
"count": psutil.cpu_count(),
|
|
"count_logical": psutil.cpu_count(logical=True),
|
|
"freq": psutil.cpu_freq()._asdict() if psutil.cpu_freq() else None,
|
|
"architecture": os.uname().machine if hasattr(os, 'uname') else "unknown"
|
|
}
|
|
|
|
# Memory information
|
|
memory = psutil.virtual_memory()
|
|
memory_info = {
|
|
"total_gb": memory.total / (1024**3),
|
|
"available_gb": memory.available / (1024**3),
|
|
"percent": memory.percent
|
|
}
|
|
|
|
# Disk information
|
|
disk = psutil.disk_usage('/')
|
|
disk_info = {
|
|
"total_gb": disk.total / (1024**3),
|
|
"free_gb": disk.free / (1024**3),
|
|
"percent": disk.percent
|
|
}
|
|
|
|
# OS information
|
|
os_info = {
|
|
"platform": os.uname().sysname if hasattr(os, 'uname') else "unknown",
|
|
"release": os.uname().release if hasattr(os, 'uname') else "unknown",
|
|
"version": os.uname().version if hasattr(os, 'uname') else "unknown"
|
|
}
|
|
|
|
# Python information
|
|
python_info = {
|
|
"version": f"{os.sys.version_info.major}.{os.sys.version_info.minor}.{os.sys.version_info.micro}",
|
|
"implementation": os.sys.implementation.name,
|
|
"platform": os.sys.platform
|
|
}
|
|
|
|
return {
|
|
"cpu": cpu_info,
|
|
"memory": memory_info,
|
|
"disk": disk_info,
|
|
"os": os_info,
|
|
"python": python_info,
|
|
"timestamp": datetime.now().isoformat()
|
|
}
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to collect system info: {e}")
|
|
return {"error": str(e)}
|
|
|
|
def _load_benchmark_templates(self) -> Dict[str, BenchmarkConfig]:
|
|
"""Load predefined benchmark templates"""
|
|
templates = {
|
|
"quick": BenchmarkConfig(
|
|
name="Quick Benchmark",
|
|
description="Fast benchmark with minimal iterations",
|
|
iterations=5,
|
|
warmup_iterations=1,
|
|
parallel_runs=1,
|
|
timeout_seconds=300,
|
|
collect_system_metrics=True,
|
|
collect_detailed_metrics=False,
|
|
output_format="json",
|
|
output_file=None
|
|
),
|
|
"standard": BenchmarkConfig(
|
|
name="Standard Benchmark",
|
|
description="Standard benchmark with moderate iterations",
|
|
iterations=20,
|
|
warmup_iterations=3,
|
|
parallel_runs=2,
|
|
timeout_seconds=600,
|
|
collect_system_metrics=True,
|
|
collect_detailed_metrics=True,
|
|
output_format="html",
|
|
output_file=None
|
|
),
|
|
"comprehensive": BenchmarkConfig(
|
|
name="Comprehensive Benchmark",
|
|
description="Comprehensive benchmark with many iterations",
|
|
iterations=100,
|
|
warmup_iterations=10,
|
|
parallel_runs=4,
|
|
timeout_seconds=1800,
|
|
collect_system_metrics=True,
|
|
collect_detailed_metrics=True,
|
|
output_format="html",
|
|
output_file=None
|
|
),
|
|
"stress": BenchmarkConfig(
|
|
name="Stress Test",
|
|
description="Stress test with high load",
|
|
iterations=50,
|
|
warmup_iterations=5,
|
|
parallel_runs=8,
|
|
timeout_seconds=1200,
|
|
collect_system_metrics=True,
|
|
collect_detailed_metrics=True,
|
|
output_format="json",
|
|
output_file=None
|
|
)
|
|
}
|
|
|
|
return templates
|
|
|
|
def _load_performance_baselines(self):
|
|
"""Load performance baselines for comparison"""
|
|
baseline_file = os.path.join(getattr(self.config, 'performance_metrics_dir', './performance-metrics'), "baselines.json")
|
|
if os.path.exists(baseline_file):
|
|
try:
|
|
with open(baseline_file, 'r') as f:
|
|
self._performance_baselines = json.load(f)
|
|
self.logger.info("Loaded performance baselines for benchmarking")
|
|
except Exception as e:
|
|
self.logger.warning(f"Failed to load baselines: {e}")
|
|
|
|
def run_benchmark(self, benchmark_name: str, operation_func: Callable,
|
|
operation_args: Tuple = (), operation_kwargs: Dict = None,
|
|
config: Optional[BenchmarkConfig] = None) -> BenchmarkResult:
|
|
"""Run a benchmark for a specific operation"""
|
|
if operation_kwargs is None:
|
|
operation_kwargs = {}
|
|
|
|
# Use template if no config provided
|
|
if config is None:
|
|
if benchmark_name in self._benchmark_templates:
|
|
config = self._benchmark_templates[benchmark_name]
|
|
else:
|
|
config = self._benchmark_templates["standard"]
|
|
|
|
self.logger.info(f"Starting benchmark: {benchmark_name}")
|
|
self.logger.info(f"Configuration: {iterations} iterations, {parallel_runs} parallel runs")
|
|
|
|
start_time = datetime.now()
|
|
results = []
|
|
metrics_list = []
|
|
|
|
# Warmup runs
|
|
if config.warmup_iterations > 0:
|
|
self.logger.info(f"Running {config.warmup_iterations} warmup iterations")
|
|
for i in range(config.warmup_iterations):
|
|
try:
|
|
operation_func(*operation_args, **operation_kwargs)
|
|
except Exception as e:
|
|
self.logger.warning(f"Warmup iteration {i+1} failed: {e}")
|
|
|
|
# Main benchmark runs
|
|
self.logger.info(f"Running {config.iterations} benchmark iterations")
|
|
|
|
if config.parallel_runs > 1:
|
|
results = self._run_parallel_benchmark(operation_func, operation_args, operation_kwargs, config)
|
|
else:
|
|
results = self._run_sequential_benchmark(operation_func, operation_args, operation_kwargs, config)
|
|
|
|
# Collect system metrics if enabled
|
|
if config.collect_system_metrics:
|
|
metrics_list = self._collect_benchmark_metrics(results, config)
|
|
|
|
# Calculate statistics
|
|
durations = [r["duration"] for r in results if r["success"]]
|
|
successful_iterations = len(durations)
|
|
failed_iterations = len(results) - successful_iterations
|
|
|
|
if not durations:
|
|
raise PerformanceError("No successful benchmark iterations")
|
|
|
|
# Calculate performance statistics
|
|
stats = self._calculate_performance_statistics(durations)
|
|
|
|
# Calculate system impact
|
|
system_impact = self._calculate_system_impact(metrics_list) if metrics_list else {}
|
|
|
|
# Generate analysis and recommendations
|
|
analysis = self._analyze_benchmark_results(stats, system_impact)
|
|
recommendations = self._generate_benchmark_recommendations(analysis, stats)
|
|
|
|
# Create benchmark result
|
|
end_time = datetime.now()
|
|
total_duration = (end_time - start_time).total_seconds()
|
|
|
|
benchmark_result = BenchmarkResult(
|
|
benchmark_name=benchmark_name,
|
|
config=config,
|
|
start_time=start_time,
|
|
end_time=end_time,
|
|
total_duration=total_duration,
|
|
iterations=config.iterations,
|
|
successful_iterations=successful_iterations,
|
|
failed_iterations=failed_iterations,
|
|
durations=durations,
|
|
average_duration=stats["average"],
|
|
min_duration=stats["min"],
|
|
max_duration=stats["max"],
|
|
median_duration=stats["median"],
|
|
standard_deviation=stats["std_dev"],
|
|
coefficient_of_variation=stats["cv"],
|
|
percentiles=stats["percentiles"],
|
|
system_impact=system_impact,
|
|
metrics=metrics_list,
|
|
analysis=analysis,
|
|
recommendations=recommendations,
|
|
system_info=self._system_info,
|
|
benchmark_version="1.0.0"
|
|
)
|
|
|
|
# Store result
|
|
self._benchmark_results[benchmark_name] = benchmark_result
|
|
self._benchmark_history.append(benchmark_result)
|
|
|
|
# Save result
|
|
self._save_benchmark_result(benchmark_result)
|
|
|
|
self.logger.info(f"Benchmark completed: {benchmark_name}")
|
|
self.logger.info(f"Results: {successful_iterations}/{config.iterations} successful, "
|
|
f"avg duration: {stats['average']:.3f}s")
|
|
|
|
return benchmark_result
|
|
|
|
def _run_sequential_benchmark(self, operation_func: Callable, operation_args: Tuple,
|
|
operation_kwargs: Dict, config: BenchmarkConfig) -> List[Dict[str, Any]]:
|
|
"""Run benchmark iterations sequentially"""
|
|
results = []
|
|
|
|
for i in range(config.iterations):
|
|
self.logger.debug(f"Running iteration {i+1}/{config.iterations}")
|
|
|
|
try:
|
|
start_time = time.time()
|
|
result = operation_func(*operation_args, **operation_kwargs)
|
|
end_time = time.time()
|
|
|
|
iteration_result = {
|
|
"iteration": i + 1,
|
|
"success": True,
|
|
"duration": end_time - start_time,
|
|
"result": result,
|
|
"timestamp": datetime.now()
|
|
}
|
|
|
|
results.append(iteration_result)
|
|
|
|
except Exception as e:
|
|
self.logger.warning(f"Iteration {i+1} failed: {e}")
|
|
iteration_result = {
|
|
"iteration": i + 1,
|
|
"success": False,
|
|
"duration": 0,
|
|
"error": str(e),
|
|
"timestamp": datetime.now()
|
|
}
|
|
results.append(iteration_result)
|
|
|
|
return results
|
|
|
|
def _run_parallel_benchmark(self, operation_func: Callable, operation_args: Tuple,
|
|
operation_kwargs: Dict, config: BenchmarkConfig) -> List[Dict[str, Any]]:
|
|
"""Run benchmark iterations in parallel"""
|
|
results = []
|
|
|
|
def run_iteration(iteration_num):
|
|
try:
|
|
start_time = time.time()
|
|
result = operation_func(*operation_args, **operation_kwargs)
|
|
end_time = time.time()
|
|
|
|
return {
|
|
"iteration": iteration_num,
|
|
"success": True,
|
|
"duration": end_time - start_time,
|
|
"result": result,
|
|
"timestamp": datetime.now()
|
|
}
|
|
|
|
except Exception as e:
|
|
self.logger.warning(f"Iteration {iteration_num} failed: {e}")
|
|
return {
|
|
"iteration": iteration_num,
|
|
"success": False,
|
|
"duration": 0,
|
|
"error": str(e),
|
|
"timestamp": datetime.now()
|
|
}
|
|
|
|
# Use ThreadPoolExecutor for parallel execution
|
|
with ThreadPoolExecutor(max_workers=config.parallel_runs) as executor:
|
|
future_to_iteration = {
|
|
executor.submit(run_iteration, i + 1): i + 1
|
|
for i in range(config.iterations)
|
|
}
|
|
|
|
for future in as_completed(future_to_iteration):
|
|
result = future.result()
|
|
results.append(result)
|
|
|
|
# Sort results by iteration number
|
|
results.sort(key=lambda x: x["iteration"])
|
|
return results
|
|
|
|
def _collect_benchmark_metrics(self, results: List[Dict[str, Any]],
|
|
config: BenchmarkConfig) -> List[BenchmarkMetrics]:
|
|
"""Collect system metrics during benchmarking"""
|
|
metrics_list = []
|
|
|
|
for result in results:
|
|
if not result["success"]:
|
|
continue
|
|
|
|
try:
|
|
# Collect system metrics
|
|
cpu_percent = psutil.cpu_percent(interval=0.1)
|
|
memory = psutil.virtual_memory()
|
|
disk_io = psutil.disk_io_counters()
|
|
net_io = psutil.net_io_counters()
|
|
|
|
# Get load average if available
|
|
try:
|
|
load_avg = os.getloadavg()
|
|
except (OSError, AttributeError):
|
|
load_avg = (0.0, 0.0, 0.0)
|
|
|
|
# Get disk usage
|
|
disk_usage = psutil.disk_usage('/')
|
|
|
|
# Get network connections count
|
|
try:
|
|
net_connections = len(psutil.net_connections())
|
|
except (OSError, psutil.AccessDenied):
|
|
net_connections = 0
|
|
|
|
metrics = BenchmarkMetrics(
|
|
timestamp=result["timestamp"],
|
|
duration=result["duration"],
|
|
cpu_percent=cpu_percent,
|
|
memory_mb=memory.used / (1024 * 1024),
|
|
disk_io_read_mb=disk_io.read_bytes / (1024 * 1024) if disk_io else 0,
|
|
disk_io_write_mb=disk_io.write_bytes / (1024 * 1024) if disk_io else 0,
|
|
network_io_mb=(net_io.bytes_sent + net_io.bytes_recv) / (1024 * 1024) if net_io else 0,
|
|
chroot_size_mb=0, # Would need to be calculated from actual chroot
|
|
cache_hit_rate=0.0, # Would need to be calculated from cache metrics
|
|
parallel_efficiency=1.0, # Would need to be calculated
|
|
resource_utilization=0.0, # Would need to be calculated
|
|
system_cpu_percent=cpu_percent,
|
|
system_memory_percent=memory.percent,
|
|
system_load_average=load_avg,
|
|
system_disk_usage_percent=disk_usage.percent,
|
|
system_network_connections=net_connections
|
|
)
|
|
|
|
metrics_list.append(metrics)
|
|
|
|
except Exception as e:
|
|
self.logger.warning(f"Failed to collect metrics for iteration {result['iteration']}: {e}")
|
|
|
|
return metrics_list
|
|
|
|
def _calculate_performance_statistics(self, durations: List[float]) -> Dict[str, Any]:
|
|
"""Calculate comprehensive performance statistics"""
|
|
if not durations:
|
|
return {}
|
|
|
|
# Basic statistics
|
|
avg_duration = statistics.mean(durations)
|
|
min_duration = min(durations)
|
|
max_duration = max(durations)
|
|
median_duration = statistics.median(durations)
|
|
|
|
# Standard deviation and coefficient of variation
|
|
try:
|
|
std_dev = statistics.stdev(durations)
|
|
cv = std_dev / avg_duration if avg_duration > 0 else 0
|
|
except statistics.StatisticsError:
|
|
std_dev = 0
|
|
cv = 0
|
|
|
|
# Percentiles
|
|
sorted_durations = sorted(durations)
|
|
percentiles = {
|
|
"p10": sorted_durations[int(0.1 * len(sorted_durations))],
|
|
"p25": sorted_durations[int(0.25 * len(sorted_durations))],
|
|
"p50": sorted_durations[int(0.5 * len(sorted_durations))],
|
|
"p75": sorted_durations[int(0.75 * len(sorted_durations))],
|
|
"p90": sorted_durations[int(0.9 * len(sorted_durations))],
|
|
"p95": sorted_durations[int(0.95 * len(sorted_durations))],
|
|
"p99": sorted_durations[int(0.99 * len(sorted_durations))]
|
|
}
|
|
|
|
return {
|
|
"average": avg_duration,
|
|
"min": min_duration,
|
|
"max": max_duration,
|
|
"median": median_duration,
|
|
"std_dev": std_dev,
|
|
"cv": cv,
|
|
"percentiles": percentiles
|
|
}
|
|
|
|
def _calculate_system_impact(self, metrics_list: List[BenchmarkMetrics]) -> Dict[str, float]:
|
|
"""Calculate system impact during benchmarking"""
|
|
if not metrics_list:
|
|
return {}
|
|
|
|
# Calculate averages across all metrics
|
|
avg_cpu = statistics.mean(m.cpu_percent for m in metrics_list)
|
|
avg_memory = statistics.mean(m.memory_mb for m in metrics_list)
|
|
avg_disk_read = statistics.mean(m.disk_io_read_mb for m in metrics_list)
|
|
avg_disk_write = statistics.mean(m.disk_io_write_mb for m in metrics_list)
|
|
avg_network = statistics.mean(m.network_io_mb for m in metrics_list)
|
|
|
|
# Calculate peak values
|
|
peak_cpu = max(m.cpu_percent for m in metrics_list)
|
|
peak_memory = max(m.memory_mb for m in metrics_list)
|
|
|
|
return {
|
|
"avg_cpu_percent": avg_cpu,
|
|
"avg_memory_mb": avg_memory,
|
|
"avg_disk_read_mb": avg_disk_read,
|
|
"avg_disk_write_mb": avg_disk_write,
|
|
"avg_network_mb": avg_network,
|
|
"peak_cpu_percent": peak_cpu,
|
|
"peak_memory_mb": peak_memory
|
|
}
|
|
|
|
def _analyze_benchmark_results(self, stats: Dict[str, Any],
|
|
system_impact: Dict[str, float]) -> Dict[str, Any]:
|
|
"""Analyze benchmark results for insights"""
|
|
analysis = {
|
|
"performance_stability": "unknown",
|
|
"system_impact_level": "unknown",
|
|
"optimization_opportunities": [],
|
|
"anomalies": []
|
|
}
|
|
|
|
# Analyze performance stability
|
|
cv = stats.get("cv", 0)
|
|
if cv < 0.1:
|
|
analysis["performance_stability"] = "excellent"
|
|
elif cv < 0.2:
|
|
analysis["performance_stability"] = "good"
|
|
elif cv < 0.3:
|
|
analysis["performance_stability"] = "fair"
|
|
else:
|
|
analysis["performance_stability"] = "poor"
|
|
analysis["optimization_opportunities"].append("High performance variability detected")
|
|
|
|
# Analyze system impact
|
|
avg_cpu = system_impact.get("avg_cpu_percent", 0)
|
|
avg_memory = system_impact.get("avg_memory_mb", 0)
|
|
|
|
if avg_cpu < 30:
|
|
analysis["system_impact_level"] = "low"
|
|
analysis["optimization_opportunities"].append("CPU utilization is low, consider increasing parallelization")
|
|
elif avg_cpu < 70:
|
|
analysis["system_impact_level"] = "moderate"
|
|
else:
|
|
analysis["system_impact_level"] = "high"
|
|
analysis["optimization_opportunities"].append("High CPU utilization, consider reducing load")
|
|
|
|
if avg_memory > 2048: # 2GB
|
|
analysis["optimization_opportunities"].append("High memory usage, consider optimizing memory allocation")
|
|
|
|
# Detect anomalies
|
|
durations = stats.get("durations", [])
|
|
if durations:
|
|
avg_duration = stats.get("average", 0)
|
|
for duration in durations:
|
|
if abs(duration - avg_duration) > 2 * stats.get("std_dev", 0):
|
|
analysis["anomalies"].append(f"Duration anomaly: {duration:.3f}s (avg: {avg_duration:.3f}s)")
|
|
|
|
return analysis
|
|
|
|
def _generate_benchmark_recommendations(self, analysis: Dict[str, Any],
|
|
stats: Dict[str, Any]) -> List[str]:
|
|
"""Generate actionable recommendations based on benchmark results"""
|
|
recommendations = []
|
|
|
|
# Performance stability recommendations
|
|
stability = analysis.get("performance_stability", "unknown")
|
|
if stability in ["fair", "poor"]:
|
|
recommendations.append("Investigate performance variability - check for external factors affecting performance")
|
|
recommendations.append("Consider running more iterations to get more stable results")
|
|
|
|
# System impact recommendations
|
|
impact_level = analysis.get("system_impact_level", "unknown")
|
|
if impact_level == "low":
|
|
recommendations.append("System resources are underutilized - consider increasing workload or parallelization")
|
|
elif impact_level == "high":
|
|
recommendations.append("System is under high load - consider reducing workload or optimizing operations")
|
|
|
|
# Optimization recommendations
|
|
for opportunity in analysis.get("optimization_opportunities", []):
|
|
recommendations.append(opportunity)
|
|
|
|
# General recommendations
|
|
if stats.get("cv", 0) > 0.2:
|
|
recommendations.append("High coefficient of variation suggests inconsistent performance - investigate root causes")
|
|
|
|
if len(recommendations) == 0:
|
|
recommendations.append("Performance is within acceptable parameters - continue monitoring")
|
|
|
|
return recommendations
|
|
|
|
def _save_benchmark_result(self, result: BenchmarkResult):
|
|
"""Save benchmark result to file"""
|
|
try:
|
|
metrics_dir = getattr(self.config, 'performance_metrics_dir', './performance-metrics')
|
|
os.makedirs(metrics_dir, exist_ok=True)
|
|
|
|
timestamp = result.start_time.strftime("%Y%m%d_%H%M%S")
|
|
filename = f"benchmark_{result.benchmark_name}_{timestamp}.json"
|
|
filepath = os.path.join(metrics_dir, filename)
|
|
|
|
# Convert to dict for JSON serialization
|
|
result_dict = asdict(result)
|
|
result_dict["start_time"] = result.start_time.isoformat()
|
|
result_dict["end_time"] = result.end_time.isoformat()
|
|
result_dict["timestamp"] = result.timestamp.isoformat()
|
|
|
|
with open(filepath, 'w') as f:
|
|
json.dump(result_dict, f, indent=2, default=str)
|
|
|
|
self.logger.info(f"Benchmark result saved: {filepath}")
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to save benchmark result: {e}")
|
|
|
|
def compare_benchmarks(self, benchmark_names: List[str]) -> Dict[str, Any]:
|
|
"""Compare multiple benchmark results"""
|
|
if len(benchmark_names) < 2:
|
|
raise ValueError("Need at least 2 benchmark names for comparison")
|
|
|
|
comparison = {
|
|
"benchmarks": benchmark_names,
|
|
"comparison_date": datetime.now().isoformat(),
|
|
"results": {},
|
|
"analysis": {},
|
|
"recommendations": []
|
|
}
|
|
|
|
# Collect benchmark results
|
|
for name in benchmark_names:
|
|
if name in self._benchmark_results:
|
|
result = self._benchmark_results[name]
|
|
comparison["results"][name] = {
|
|
"average_duration": result.average_duration,
|
|
"min_duration": result.min_duration,
|
|
"max_duration": result.max_duration,
|
|
"standard_deviation": result.standard_deviation,
|
|
"coefficient_of_variation": result.coefficient_of_variation,
|
|
"successful_iterations": result.successful_iterations,
|
|
"total_iterations": result.iterations
|
|
}
|
|
|
|
# Perform comparison analysis
|
|
if len(comparison["results"]) >= 2:
|
|
comparison["analysis"] = self._analyze_benchmark_comparison(comparison["results"])
|
|
comparison["recommendations"] = self._generate_comparison_recommendations(comparison["analysis"])
|
|
|
|
return comparison
|
|
|
|
def _analyze_benchmark_comparison(self, results: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Analyze comparison between benchmark results"""
|
|
analysis = {
|
|
"fastest_benchmark": None,
|
|
"slowest_benchmark": None,
|
|
"most_stable_benchmark": None,
|
|
"least_stable_benchmark": None,
|
|
"performance_differences": {},
|
|
"stability_differences": {}
|
|
}
|
|
|
|
if len(results) < 2:
|
|
return analysis
|
|
|
|
# Find fastest and slowest
|
|
avg_durations = {name: data["average_duration"] for name, data in results.items()}
|
|
fastest = min(avg_durations, key=avg_durations.get)
|
|
slowest = max(avg_durations, key=avg_durations.get)
|
|
|
|
analysis["fastest_benchmark"] = fastest
|
|
analysis["slowest_benchmark"] = slowest
|
|
|
|
# Find most and least stable
|
|
cv_values = {name: data["coefficient_of_variation"] for name, data in results.items()}
|
|
most_stable = min(cv_values, key=cv_values.get)
|
|
least_stable = max(cv_values, key=cv_values.get)
|
|
|
|
analysis["most_stable_benchmark"] = most_stable
|
|
analysis["least_stable_benchmark"] = least_stable
|
|
|
|
# Calculate performance differences
|
|
fastest_avg = avg_durations[fastest]
|
|
for name, data in results.items():
|
|
if name != fastest:
|
|
diff_percent = ((data["average_duration"] - fastest_avg) / fastest_avg) * 100
|
|
analysis["performance_differences"][name] = {
|
|
"vs_fastest_percent": diff_percent,
|
|
"vs_fastest_seconds": data["average_duration"] - fastest_avg
|
|
}
|
|
|
|
# Calculate stability differences
|
|
most_stable_cv = cv_values[most_stable]
|
|
for name, data in results.items():
|
|
if name != most_stable:
|
|
cv_diff = data["coefficient_of_variation"] - most_stable_cv
|
|
analysis["stability_differences"][name] = {
|
|
"vs_most_stable_cv": cv_diff,
|
|
"stability_ratio": data["coefficient_of_variation"] / most_stable_cv
|
|
}
|
|
|
|
return analysis
|
|
|
|
def _generate_comparison_recommendations(self, analysis: Dict[str, Any]) -> List[str]:
|
|
"""Generate recommendations based on benchmark comparison"""
|
|
recommendations = []
|
|
|
|
fastest = analysis.get("fastest_benchmark")
|
|
slowest = analysis.get("slowest_benchmark")
|
|
most_stable = analysis.get("most_stable_benchmark")
|
|
least_stable = analysis.get("least_stable_benchmark")
|
|
|
|
if fastest and slowest and fastest != slowest:
|
|
fastest_avg = analysis["performance_differences"][slowest]["vs_fastest_percent"]
|
|
recommendations.append(f"Benchmark '{slowest}' is {fastest_avg:.1f}% slower than '{fastest}' - investigate performance differences")
|
|
|
|
if most_stable and least_stable and most_stable != least_stable:
|
|
stability_ratio = analysis["stability_differences"][least_stable]["stability_ratio"]
|
|
recommendations.append(f"Benchmark '{least_stable}' is {stability_ratio:.2f}x less stable than '{most_stable}' - investigate variability causes")
|
|
|
|
# General recommendations
|
|
if len(analysis.get("performance_differences", {})) > 0:
|
|
recommendations.append("Consider using the fastest benchmark configuration for production")
|
|
|
|
if len(analysis.get("stability_differences", {})) > 0:
|
|
recommendations.append("Consider using the most stable benchmark configuration for critical operations")
|
|
|
|
return recommendations
|
|
|
|
def list_benchmarks(self) -> List[str]:
|
|
"""List all available benchmark templates"""
|
|
return list(self._benchmark_templates.keys())
|
|
|
|
def get_benchmark_result(self, benchmark_name: str) -> Optional[BenchmarkResult]:
|
|
"""Get a specific benchmark result"""
|
|
return self._benchmark_results.get(benchmark_name)
|
|
|
|
def get_benchmark_history(self) -> List[BenchmarkResult]:
|
|
"""Get all benchmark results"""
|
|
return self._benchmark_history.copy()
|
|
|
|
def clear_benchmark_history(self):
|
|
"""Clear benchmark history"""
|
|
self._benchmark_history.clear()
|
|
self._benchmark_results.clear()
|
|
self.logger.info("Benchmark history cleared")
|