""" Advanced benchmarking system for deb-mock """ import time import psutil import threading import json import os import statistics import subprocess from pathlib import Path from typing import Dict, List, Any, Optional, Callable, Tuple from contextlib import contextmanager from dataclasses import dataclass, asdict from datetime import datetime, timedelta import logging from concurrent.futures import ThreadPoolExecutor, as_completed import multiprocessing from .exceptions import PerformanceError @dataclass class BenchmarkConfig: """Configuration for benchmarking""" name: str description: str iterations: int warmup_iterations: int parallel_runs: int timeout_seconds: int collect_system_metrics: bool collect_detailed_metrics: bool output_format: str # json, html, csv output_file: Optional[str] @dataclass class BenchmarkMetrics: """Metrics collected during benchmarking""" timestamp: datetime duration: float cpu_percent: float memory_mb: float disk_io_read_mb: float disk_io_write_mb: float network_io_mb: float chroot_size_mb: float cache_hit_rate: float parallel_efficiency: float resource_utilization: float # System-level metrics system_cpu_percent: float system_memory_percent: float system_load_average: Tuple[float, float, float] system_disk_usage_percent: float system_network_connections: int @dataclass class BenchmarkResult: """Result of a benchmark run""" benchmark_name: str config: BenchmarkConfig start_time: datetime end_time: datetime total_duration: float iterations: int successful_iterations: int failed_iterations: int # Performance statistics durations: List[float] average_duration: float min_duration: float max_duration: float median_duration: float standard_deviation: float coefficient_of_variation: float # Percentiles percentiles: Dict[str, float] # System impact system_impact: Dict[str, float] # Detailed metrics metrics: List[BenchmarkMetrics] # Analysis analysis: Dict[str, Any] recommendations: List[str] # Metadata system_info: Dict[str, Any] benchmark_version: str class BenchmarkRunner: """Advanced benchmark runner for deb-mock operations""" def __init__(self, config): self.config = config self.logger = logging.getLogger(__name__) # Benchmark history self._benchmark_history = [] self._benchmark_results = {} # System information self._system_info = self._collect_system_info() # Benchmark templates self._benchmark_templates = self._load_benchmark_templates() # Performance baselines self._performance_baselines = {} self._load_performance_baselines() def _collect_system_info(self) -> Dict[str, Any]: """Collect comprehensive system information""" try: # CPU information cpu_info = { "count": psutil.cpu_count(), "count_logical": psutil.cpu_count(logical=True), "freq": psutil.cpu_freq()._asdict() if psutil.cpu_freq() else None, "architecture": os.uname().machine if hasattr(os, 'uname') else "unknown" } # Memory information memory = psutil.virtual_memory() memory_info = { "total_gb": memory.total / (1024**3), "available_gb": memory.available / (1024**3), "percent": memory.percent } # Disk information disk = psutil.disk_usage('/') disk_info = { "total_gb": disk.total / (1024**3), "free_gb": disk.free / (1024**3), "percent": disk.percent } # OS information os_info = { "platform": os.uname().sysname if hasattr(os, 'uname') else "unknown", "release": os.uname().release if hasattr(os, 'uname') else "unknown", "version": os.uname().version if hasattr(os, 'uname') else "unknown" } # Python information python_info = { "version": f"{os.sys.version_info.major}.{os.sys.version_info.minor}.{os.sys.version_info.micro}", "implementation": os.sys.implementation.name, "platform": os.sys.platform } return { "cpu": cpu_info, "memory": memory_info, "disk": disk_info, "os": os_info, "python": python_info, "timestamp": datetime.now().isoformat() } except Exception as e: self.logger.error(f"Failed to collect system info: {e}") return {"error": str(e)} def _load_benchmark_templates(self) -> Dict[str, BenchmarkConfig]: """Load predefined benchmark templates""" templates = { "quick": BenchmarkConfig( name="Quick Benchmark", description="Fast benchmark with minimal iterations", iterations=5, warmup_iterations=1, parallel_runs=1, timeout_seconds=300, collect_system_metrics=True, collect_detailed_metrics=False, output_format="json", output_file=None ), "standard": BenchmarkConfig( name="Standard Benchmark", description="Standard benchmark with moderate iterations", iterations=20, warmup_iterations=3, parallel_runs=2, timeout_seconds=600, collect_system_metrics=True, collect_detailed_metrics=True, output_format="html", output_file=None ), "comprehensive": BenchmarkConfig( name="Comprehensive Benchmark", description="Comprehensive benchmark with many iterations", iterations=100, warmup_iterations=10, parallel_runs=4, timeout_seconds=1800, collect_system_metrics=True, collect_detailed_metrics=True, output_format="html", output_file=None ), "stress": BenchmarkConfig( name="Stress Test", description="Stress test with high load", iterations=50, warmup_iterations=5, parallel_runs=8, timeout_seconds=1200, collect_system_metrics=True, collect_detailed_metrics=True, output_format="json", output_file=None ) } return templates def _load_performance_baselines(self): """Load performance baselines for comparison""" baseline_file = os.path.join(getattr(self.config, 'performance_metrics_dir', './performance-metrics'), "baselines.json") if os.path.exists(baseline_file): try: with open(baseline_file, 'r') as f: self._performance_baselines = json.load(f) self.logger.info("Loaded performance baselines for benchmarking") except Exception as e: self.logger.warning(f"Failed to load baselines: {e}") def run_benchmark(self, benchmark_name: str, operation_func: Callable, operation_args: Tuple = (), operation_kwargs: Dict = None, config: Optional[BenchmarkConfig] = None) -> BenchmarkResult: """Run a benchmark for a specific operation""" if operation_kwargs is None: operation_kwargs = {} # Use template if no config provided if config is None: if benchmark_name in self._benchmark_templates: config = self._benchmark_templates[benchmark_name] else: config = self._benchmark_templates["standard"] self.logger.info(f"Starting benchmark: {benchmark_name}") self.logger.info(f"Configuration: {iterations} iterations, {parallel_runs} parallel runs") start_time = datetime.now() results = [] metrics_list = [] # Warmup runs if config.warmup_iterations > 0: self.logger.info(f"Running {config.warmup_iterations} warmup iterations") for i in range(config.warmup_iterations): try: operation_func(*operation_args, **operation_kwargs) except Exception as e: self.logger.warning(f"Warmup iteration {i+1} failed: {e}") # Main benchmark runs self.logger.info(f"Running {config.iterations} benchmark iterations") if config.parallel_runs > 1: results = self._run_parallel_benchmark(operation_func, operation_args, operation_kwargs, config) else: results = self._run_sequential_benchmark(operation_func, operation_args, operation_kwargs, config) # Collect system metrics if enabled if config.collect_system_metrics: metrics_list = self._collect_benchmark_metrics(results, config) # Calculate statistics durations = [r["duration"] for r in results if r["success"]] successful_iterations = len(durations) failed_iterations = len(results) - successful_iterations if not durations: raise PerformanceError("No successful benchmark iterations") # Calculate performance statistics stats = self._calculate_performance_statistics(durations) # Calculate system impact system_impact = self._calculate_system_impact(metrics_list) if metrics_list else {} # Generate analysis and recommendations analysis = self._analyze_benchmark_results(stats, system_impact) recommendations = self._generate_benchmark_recommendations(analysis, stats) # Create benchmark result end_time = datetime.now() total_duration = (end_time - start_time).total_seconds() benchmark_result = BenchmarkResult( benchmark_name=benchmark_name, config=config, start_time=start_time, end_time=end_time, total_duration=total_duration, iterations=config.iterations, successful_iterations=successful_iterations, failed_iterations=failed_iterations, durations=durations, average_duration=stats["average"], min_duration=stats["min"], max_duration=stats["max"], median_duration=stats["median"], standard_deviation=stats["std_dev"], coefficient_of_variation=stats["cv"], percentiles=stats["percentiles"], system_impact=system_impact, metrics=metrics_list, analysis=analysis, recommendations=recommendations, system_info=self._system_info, benchmark_version="1.0.0" ) # Store result self._benchmark_results[benchmark_name] = benchmark_result self._benchmark_history.append(benchmark_result) # Save result self._save_benchmark_result(benchmark_result) self.logger.info(f"Benchmark completed: {benchmark_name}") self.logger.info(f"Results: {successful_iterations}/{config.iterations} successful, " f"avg duration: {stats['average']:.3f}s") return benchmark_result def _run_sequential_benchmark(self, operation_func: Callable, operation_args: Tuple, operation_kwargs: Dict, config: BenchmarkConfig) -> List[Dict[str, Any]]: """Run benchmark iterations sequentially""" results = [] for i in range(config.iterations): self.logger.debug(f"Running iteration {i+1}/{config.iterations}") try: start_time = time.time() result = operation_func(*operation_args, **operation_kwargs) end_time = time.time() iteration_result = { "iteration": i + 1, "success": True, "duration": end_time - start_time, "result": result, "timestamp": datetime.now() } results.append(iteration_result) except Exception as e: self.logger.warning(f"Iteration {i+1} failed: {e}") iteration_result = { "iteration": i + 1, "success": False, "duration": 0, "error": str(e), "timestamp": datetime.now() } results.append(iteration_result) return results def _run_parallel_benchmark(self, operation_func: Callable, operation_args: Tuple, operation_kwargs: Dict, config: BenchmarkConfig) -> List[Dict[str, Any]]: """Run benchmark iterations in parallel""" results = [] def run_iteration(iteration_num): try: start_time = time.time() result = operation_func(*operation_args, **operation_kwargs) end_time = time.time() return { "iteration": iteration_num, "success": True, "duration": end_time - start_time, "result": result, "timestamp": datetime.now() } except Exception as e: self.logger.warning(f"Iteration {iteration_num} failed: {e}") return { "iteration": iteration_num, "success": False, "duration": 0, "error": str(e), "timestamp": datetime.now() } # Use ThreadPoolExecutor for parallel execution with ThreadPoolExecutor(max_workers=config.parallel_runs) as executor: future_to_iteration = { executor.submit(run_iteration, i + 1): i + 1 for i in range(config.iterations) } for future in as_completed(future_to_iteration): result = future.result() results.append(result) # Sort results by iteration number results.sort(key=lambda x: x["iteration"]) return results def _collect_benchmark_metrics(self, results: List[Dict[str, Any]], config: BenchmarkConfig) -> List[BenchmarkMetrics]: """Collect system metrics during benchmarking""" metrics_list = [] for result in results: if not result["success"]: continue try: # Collect system metrics cpu_percent = psutil.cpu_percent(interval=0.1) memory = psutil.virtual_memory() disk_io = psutil.disk_io_counters() net_io = psutil.net_io_counters() # Get load average if available try: load_avg = os.getloadavg() except (OSError, AttributeError): load_avg = (0.0, 0.0, 0.0) # Get disk usage disk_usage = psutil.disk_usage('/') # Get network connections count try: net_connections = len(psutil.net_connections()) except (OSError, psutil.AccessDenied): net_connections = 0 metrics = BenchmarkMetrics( timestamp=result["timestamp"], duration=result["duration"], cpu_percent=cpu_percent, memory_mb=memory.used / (1024 * 1024), disk_io_read_mb=disk_io.read_bytes / (1024 * 1024) if disk_io else 0, disk_io_write_mb=disk_io.write_bytes / (1024 * 1024) if disk_io else 0, network_io_mb=(net_io.bytes_sent + net_io.bytes_recv) / (1024 * 1024) if net_io else 0, chroot_size_mb=0, # Would need to be calculated from actual chroot cache_hit_rate=0.0, # Would need to be calculated from cache metrics parallel_efficiency=1.0, # Would need to be calculated resource_utilization=0.0, # Would need to be calculated system_cpu_percent=cpu_percent, system_memory_percent=memory.percent, system_load_average=load_avg, system_disk_usage_percent=disk_usage.percent, system_network_connections=net_connections ) metrics_list.append(metrics) except Exception as e: self.logger.warning(f"Failed to collect metrics for iteration {result['iteration']}: {e}") return metrics_list def _calculate_performance_statistics(self, durations: List[float]) -> Dict[str, Any]: """Calculate comprehensive performance statistics""" if not durations: return {} # Basic statistics avg_duration = statistics.mean(durations) min_duration = min(durations) max_duration = max(durations) median_duration = statistics.median(durations) # Standard deviation and coefficient of variation try: std_dev = statistics.stdev(durations) cv = std_dev / avg_duration if avg_duration > 0 else 0 except statistics.StatisticsError: std_dev = 0 cv = 0 # Percentiles sorted_durations = sorted(durations) percentiles = { "p10": sorted_durations[int(0.1 * len(sorted_durations))], "p25": sorted_durations[int(0.25 * len(sorted_durations))], "p50": sorted_durations[int(0.5 * len(sorted_durations))], "p75": sorted_durations[int(0.75 * len(sorted_durations))], "p90": sorted_durations[int(0.9 * len(sorted_durations))], "p95": sorted_durations[int(0.95 * len(sorted_durations))], "p99": sorted_durations[int(0.99 * len(sorted_durations))] } return { "average": avg_duration, "min": min_duration, "max": max_duration, "median": median_duration, "std_dev": std_dev, "cv": cv, "percentiles": percentiles } def _calculate_system_impact(self, metrics_list: List[BenchmarkMetrics]) -> Dict[str, float]: """Calculate system impact during benchmarking""" if not metrics_list: return {} # Calculate averages across all metrics avg_cpu = statistics.mean(m.cpu_percent for m in metrics_list) avg_memory = statistics.mean(m.memory_mb for m in metrics_list) avg_disk_read = statistics.mean(m.disk_io_read_mb for m in metrics_list) avg_disk_write = statistics.mean(m.disk_io_write_mb for m in metrics_list) avg_network = statistics.mean(m.network_io_mb for m in metrics_list) # Calculate peak values peak_cpu = max(m.cpu_percent for m in metrics_list) peak_memory = max(m.memory_mb for m in metrics_list) return { "avg_cpu_percent": avg_cpu, "avg_memory_mb": avg_memory, "avg_disk_read_mb": avg_disk_read, "avg_disk_write_mb": avg_disk_write, "avg_network_mb": avg_network, "peak_cpu_percent": peak_cpu, "peak_memory_mb": peak_memory } def _analyze_benchmark_results(self, stats: Dict[str, Any], system_impact: Dict[str, float]) -> Dict[str, Any]: """Analyze benchmark results for insights""" analysis = { "performance_stability": "unknown", "system_impact_level": "unknown", "optimization_opportunities": [], "anomalies": [] } # Analyze performance stability cv = stats.get("cv", 0) if cv < 0.1: analysis["performance_stability"] = "excellent" elif cv < 0.2: analysis["performance_stability"] = "good" elif cv < 0.3: analysis["performance_stability"] = "fair" else: analysis["performance_stability"] = "poor" analysis["optimization_opportunities"].append("High performance variability detected") # Analyze system impact avg_cpu = system_impact.get("avg_cpu_percent", 0) avg_memory = system_impact.get("avg_memory_mb", 0) if avg_cpu < 30: analysis["system_impact_level"] = "low" analysis["optimization_opportunities"].append("CPU utilization is low, consider increasing parallelization") elif avg_cpu < 70: analysis["system_impact_level"] = "moderate" else: analysis["system_impact_level"] = "high" analysis["optimization_opportunities"].append("High CPU utilization, consider reducing load") if avg_memory > 2048: # 2GB analysis["optimization_opportunities"].append("High memory usage, consider optimizing memory allocation") # Detect anomalies durations = stats.get("durations", []) if durations: avg_duration = stats.get("average", 0) for duration in durations: if abs(duration - avg_duration) > 2 * stats.get("std_dev", 0): analysis["anomalies"].append(f"Duration anomaly: {duration:.3f}s (avg: {avg_duration:.3f}s)") return analysis def _generate_benchmark_recommendations(self, analysis: Dict[str, Any], stats: Dict[str, Any]) -> List[str]: """Generate actionable recommendations based on benchmark results""" recommendations = [] # Performance stability recommendations stability = analysis.get("performance_stability", "unknown") if stability in ["fair", "poor"]: recommendations.append("Investigate performance variability - check for external factors affecting performance") recommendations.append("Consider running more iterations to get more stable results") # System impact recommendations impact_level = analysis.get("system_impact_level", "unknown") if impact_level == "low": recommendations.append("System resources are underutilized - consider increasing workload or parallelization") elif impact_level == "high": recommendations.append("System is under high load - consider reducing workload or optimizing operations") # Optimization recommendations for opportunity in analysis.get("optimization_opportunities", []): recommendations.append(opportunity) # General recommendations if stats.get("cv", 0) > 0.2: recommendations.append("High coefficient of variation suggests inconsistent performance - investigate root causes") if len(recommendations) == 0: recommendations.append("Performance is within acceptable parameters - continue monitoring") return recommendations def _save_benchmark_result(self, result: BenchmarkResult): """Save benchmark result to file""" try: metrics_dir = getattr(self.config, 'performance_metrics_dir', './performance-metrics') os.makedirs(metrics_dir, exist_ok=True) timestamp = result.start_time.strftime("%Y%m%d_%H%M%S") filename = f"benchmark_{result.benchmark_name}_{timestamp}.json" filepath = os.path.join(metrics_dir, filename) # Convert to dict for JSON serialization result_dict = asdict(result) result_dict["start_time"] = result.start_time.isoformat() result_dict["end_time"] = result.end_time.isoformat() result_dict["timestamp"] = result.timestamp.isoformat() with open(filepath, 'w') as f: json.dump(result_dict, f, indent=2, default=str) self.logger.info(f"Benchmark result saved: {filepath}") except Exception as e: self.logger.error(f"Failed to save benchmark result: {e}") def compare_benchmarks(self, benchmark_names: List[str]) -> Dict[str, Any]: """Compare multiple benchmark results""" if len(benchmark_names) < 2: raise ValueError("Need at least 2 benchmark names for comparison") comparison = { "benchmarks": benchmark_names, "comparison_date": datetime.now().isoformat(), "results": {}, "analysis": {}, "recommendations": [] } # Collect benchmark results for name in benchmark_names: if name in self._benchmark_results: result = self._benchmark_results[name] comparison["results"][name] = { "average_duration": result.average_duration, "min_duration": result.min_duration, "max_duration": result.max_duration, "standard_deviation": result.standard_deviation, "coefficient_of_variation": result.coefficient_of_variation, "successful_iterations": result.successful_iterations, "total_iterations": result.iterations } # Perform comparison analysis if len(comparison["results"]) >= 2: comparison["analysis"] = self._analyze_benchmark_comparison(comparison["results"]) comparison["recommendations"] = self._generate_comparison_recommendations(comparison["analysis"]) return comparison def _analyze_benchmark_comparison(self, results: Dict[str, Any]) -> Dict[str, Any]: """Analyze comparison between benchmark results""" analysis = { "fastest_benchmark": None, "slowest_benchmark": None, "most_stable_benchmark": None, "least_stable_benchmark": None, "performance_differences": {}, "stability_differences": {} } if len(results) < 2: return analysis # Find fastest and slowest avg_durations = {name: data["average_duration"] for name, data in results.items()} fastest = min(avg_durations, key=avg_durations.get) slowest = max(avg_durations, key=avg_durations.get) analysis["fastest_benchmark"] = fastest analysis["slowest_benchmark"] = slowest # Find most and least stable cv_values = {name: data["coefficient_of_variation"] for name, data in results.items()} most_stable = min(cv_values, key=cv_values.get) least_stable = max(cv_values, key=cv_values.get) analysis["most_stable_benchmark"] = most_stable analysis["least_stable_benchmark"] = least_stable # Calculate performance differences fastest_avg = avg_durations[fastest] for name, data in results.items(): if name != fastest: diff_percent = ((data["average_duration"] - fastest_avg) / fastest_avg) * 100 analysis["performance_differences"][name] = { "vs_fastest_percent": diff_percent, "vs_fastest_seconds": data["average_duration"] - fastest_avg } # Calculate stability differences most_stable_cv = cv_values[most_stable] for name, data in results.items(): if name != most_stable: cv_diff = data["coefficient_of_variation"] - most_stable_cv analysis["stability_differences"][name] = { "vs_most_stable_cv": cv_diff, "stability_ratio": data["coefficient_of_variation"] / most_stable_cv } return analysis def _generate_comparison_recommendations(self, analysis: Dict[str, Any]) -> List[str]: """Generate recommendations based on benchmark comparison""" recommendations = [] fastest = analysis.get("fastest_benchmark") slowest = analysis.get("slowest_benchmark") most_stable = analysis.get("most_stable_benchmark") least_stable = analysis.get("least_stable_benchmark") if fastest and slowest and fastest != slowest: fastest_avg = analysis["performance_differences"][slowest]["vs_fastest_percent"] recommendations.append(f"Benchmark '{slowest}' is {fastest_avg:.1f}% slower than '{fastest}' - investigate performance differences") if most_stable and least_stable and most_stable != least_stable: stability_ratio = analysis["stability_differences"][least_stable]["stability_ratio"] recommendations.append(f"Benchmark '{least_stable}' is {stability_ratio:.2f}x less stable than '{most_stable}' - investigate variability causes") # General recommendations if len(analysis.get("performance_differences", {})) > 0: recommendations.append("Consider using the fastest benchmark configuration for production") if len(analysis.get("stability_differences", {})) > 0: recommendations.append("Consider using the most stable benchmark configuration for critical operations") return recommendations def list_benchmarks(self) -> List[str]: """List all available benchmark templates""" return list(self._benchmark_templates.keys()) def get_benchmark_result(self, benchmark_name: str) -> Optional[BenchmarkResult]: """Get a specific benchmark result""" return self._benchmark_results.get(benchmark_name) def get_benchmark_history(self) -> List[BenchmarkResult]: """Get all benchmark results""" return self._benchmark_history.copy() def clear_benchmark_history(self): """Clear benchmark history""" self._benchmark_history.clear() self._benchmark_results.clear() self.logger.info("Benchmark history cleared")