#!/usr/bin/env python3 """ Debian Forge Production Optimization Module This module provides performance optimization, load testing, and production monitoring capabilities for the Debian Forge system. """ import json import time import sqlite3 import threading from typing import Dict, List, Optional, Any, Tuple from dataclasses import dataclass from pathlib import Path import random @dataclass class PerformanceMetrics: """Performance metrics for monitoring""" timestamp: float cpu_usage: float memory_usage: float disk_io: float network_io: float active_builds: int queue_length: int response_time: float @dataclass class LoadTestResult: """Result of a load test""" test_name: str concurrent_users: int total_requests: int successful_requests: int failed_requests: int average_response_time: float max_response_time: float min_response_time: float throughput: float # requests per second error_rate: float class ProductionOptimization: """Production optimization and monitoring for Debian Forge""" def __init__(self, metrics_db: str = "production_metrics.db"): self.metrics_db = metrics_db self._init_metrics_db() self.monitoring_active = False self.monitoring_thread = None def _init_metrics_db(self): """Initialize metrics database""" conn = sqlite3.connect(self.metrics_db) cursor = conn.cursor() cursor.execute(""" CREATE TABLE IF NOT EXISTS performance_metrics ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp REAL NOT NULL, cpu_usage REAL NOT NULL, memory_usage REAL NOT NULL, disk_io REAL NOT NULL, network_io REAL NOT NULL, active_builds INTEGER NOT NULL, queue_length INTEGER NOT NULL, response_time REAL NOT NULL ) """) cursor.execute(""" CREATE TABLE IF NOT EXISTS load_tests ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp REAL NOT NULL, test_name TEXT NOT NULL, concurrent_users INTEGER NOT NULL, total_requests INTEGER NOT NULL, successful_requests INTEGER NOT NULL, failed_requests INTEGER NOT NULL, average_response_time REAL NOT NULL, max_response_time REAL NOT NULL, min_response_time REAL NOT NULL, throughput REAL NOT NULL, error_rate REAL NOT NULL ) """) cursor.execute(""" CREATE TABLE IF NOT EXISTS optimization_recommendations ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp REAL NOT NULL, category TEXT NOT NULL, description TEXT NOT NULL, priority TEXT NOT NULL, impact TEXT NOT NULL, implementation_effort TEXT NOT NULL, status TEXT DEFAULT 'pending' ) """) conn.commit() conn.close() def start_performance_monitoring(self, interval_seconds: int = 30): """Start continuous performance monitoring""" if self.monitoring_active: return False self.monitoring_active = True self.monitoring_thread = threading.Thread( target=self._monitoring_loop, args=(interval_seconds,), daemon=True ) self.monitoring_thread.start() return True def stop_performance_monitoring(self): """Stop performance monitoring""" self.monitoring_active = False if self.monitoring_thread: self.monitoring_thread.join() def _monitoring_loop(self, interval_seconds: int): """Main monitoring loop""" while self.monitoring_active: try: metrics = self._collect_performance_metrics() self._store_performance_metrics(metrics) time.sleep(interval_seconds) except Exception as e: print(f"Monitoring error: {e}") time.sleep(interval_seconds) def _collect_performance_metrics(self) -> PerformanceMetrics: """Collect current performance metrics""" # Simulated metrics for demonstration # In production, these would come from actual system monitoring current_time = time.time() # Simulate CPU usage (0-100%) cpu_usage = random.uniform(20.0, 80.0) # Simulate memory usage (0-100%) memory_usage = random.uniform(30.0, 90.0) # Simulate disk I/O (MB/s) disk_io = random.uniform(5.0, 50.0) # Simulate network I/O (MB/s) network_io = random.uniform(1.0, 20.0) # Simulate active builds (0-10) active_builds = random.randint(0, 10) # Simulate queue length (0-50) queue_length = random.randint(0, 50) # Simulate response time (ms) response_time = random.uniform(100.0, 2000.0) return PerformanceMetrics( timestamp=current_time, cpu_usage=cpu_usage, memory_usage=memory_usage, disk_io=disk_io, network_io=network_io, active_builds=active_builds, queue_length=queue_length, response_time=response_time ) def _store_performance_metrics(self, metrics: PerformanceMetrics): """Store performance metrics in database""" try: conn = sqlite3.connect(self.metrics_db) cursor = conn.cursor() cursor.execute(""" INSERT INTO performance_metrics (timestamp, cpu_usage, memory_usage, disk_io, network_io, active_builds, queue_length, response_time) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( metrics.timestamp, metrics.cpu_usage, metrics.memory_usage, metrics.disk_io, metrics.network_io, metrics.active_builds, metrics.queue_length, metrics.response_time )) conn.commit() conn.close() except Exception as e: print(f"Failed to store metrics: {e}") def get_performance_history(self, hours: int = 24) -> List[PerformanceMetrics]: """Get performance metrics history""" try: conn = sqlite3.connect(self.metrics_db) cursor = conn.cursor() cutoff_time = time.time() - (hours * 3600) cursor.execute(""" SELECT timestamp, cpu_usage, memory_usage, disk_io, network_io, active_builds, queue_length, response_time FROM performance_metrics WHERE timestamp > ? ORDER BY timestamp DESC """, (cutoff_time,)) results = [] for row in cursor.fetchall(): metrics = PerformanceMetrics( timestamp=row[0], cpu_usage=row[1], memory_usage=row[2], disk_io=row[3], network_io=row[4], active_builds=row[5], queue_length=row[6], response_time=row[7] ) results.append(metrics) conn.close() return results except Exception as e: print(f"Failed to retrieve performance history: {e}") return [] def run_load_test(self, test_name: str, concurrent_users: int, duration_seconds: int = 300) -> LoadTestResult: """Run a load test simulation""" print(f"🚀 Starting load test: {test_name}") print(f" Concurrent users: {concurrent_users}") print(f" Duration: {duration_seconds} seconds") start_time = time.time() total_requests = 0 successful_requests = 0 failed_requests = 0 response_times = [] # Simulate load test while time.time() - start_time < duration_seconds: # Simulate concurrent user requests for user in range(concurrent_users): request_start = time.time() # Simulate request processing processing_time = random.uniform(0.1, 2.0) time.sleep(processing_time) # Simulate success/failure success = random.random() > 0.05 # 95% success rate if success: successful_requests += 1 else: failed_requests += 1 response_time = (time.time() - request_start) * 1000 # Convert to ms response_times.append(response_time) total_requests += 1 # Small delay between requests time.sleep(0.01) # Calculate metrics if response_times: average_response_time = sum(response_times) / len(response_times) max_response_time = max(response_times) min_response_time = min(response_times) else: average_response_time = max_response_time = min_response_time = 0 throughput = total_requests / duration_seconds error_rate = (failed_requests / total_requests) * 100 if total_requests > 0 else 0 result = LoadTestResult( test_name=test_name, concurrent_users=concurrent_users, total_requests=total_requests, successful_requests=successful_requests, failed_requests=failed_requests, average_response_time=average_response_time, max_response_time=max_response_time, min_response_time=min_response_time, throughput=throughput, error_rate=error_rate ) # Store load test result self._store_load_test_result(result) return result def _store_load_test_result(self, result: LoadTestResult): """Store load test result in database""" try: conn = sqlite3.connect(self.metrics_db) cursor = conn.cursor() cursor.execute(""" INSERT INTO load_tests (timestamp, test_name, concurrent_users, total_requests, successful_requests, failed_requests, average_response_time, max_response_time, min_response_time, throughput, error_rate) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( time.time(), result.test_name, result.concurrent_users, result.total_requests, result.successful_requests, result.failed_requests, result.average_response_time, result.max_response_time, result.min_response_time, result.throughput, result.error_rate )) conn.commit() conn.close() except Exception as e: print(f"Failed to store load test result: {e}") def get_load_test_history(self) -> List[LoadTestResult]: """Get load test history""" try: conn = sqlite3.connect(self.metrics_db) cursor = conn.cursor() cursor.execute(""" SELECT timestamp, test_name, concurrent_users, total_requests, successful_requests, failed_requests, average_response_time, max_response_time, min_response_time, throughput, error_rate FROM load_tests ORDER BY timestamp DESC """) results = [] for row in cursor.fetchall(): result = LoadTestResult( test_name=row[1], concurrent_users=row[2], total_requests=row[3], successful_requests=row[4], failed_requests=row[5], average_response_time=row[6], max_response_time=row[7], min_response_time=row[8], throughput=row[9], error_rate=row[10] ) results.append(result) conn.close() return results except Exception as e: print(f"Failed to retrieve load test history: {e}") return [] def analyze_performance_bottlenecks(self) -> List[Dict[str, Any]]: """Analyze performance data for bottlenecks""" bottlenecks = [] try: # Get recent performance data recent_metrics = self.get_performance_history(hours=1) if not recent_metrics: return bottlenecks # Analyze CPU usage avg_cpu = sum(m.cpu_usage for m in recent_metrics) / len(recent_metrics) if avg_cpu > 80: bottlenecks.append({ "category": "CPU", "severity": "high" if avg_cpu > 90 else "medium", "description": f"High CPU usage: {avg_cpu:.1f}%", "recommendation": "Consider scaling CPU resources or optimizing build processes" }) # Analyze memory usage avg_memory = sum(m.memory_usage for m in recent_metrics) / len(recent_metrics) if avg_memory > 85: bottlenecks.append({ "category": "Memory", "severity": "high" if avg_memory > 95 else "medium", "description": f"High memory usage: {avg_memory:.1f}%", "recommendation": "Consider increasing memory or implementing memory optimization" }) # Analyze response times avg_response = sum(m.response_time for m in recent_metrics) / len(recent_metrics) if avg_response > 1000: # > 1 second bottlenecks.append({ "category": "Response Time", "severity": "high" if avg_response > 2000 else "medium", "description": f"Slow response time: {avg_response:.1f}ms", "recommendation": "Investigate slow operations and optimize critical paths" }) # Analyze queue length avg_queue = sum(m.queue_length for m in recent_metrics) / len(recent_metrics) if avg_queue > 20: bottlenecks.append({ "category": "Queue", "severity": "high" if avg_queue > 40 else "medium", "description": f"Long build queue: {avg_queue:.1f} builds", "recommendation": "Consider adding more build workers or optimizing build times" }) except Exception as e: bottlenecks.append({ "category": "Analysis", "severity": "medium", "description": f"Performance analysis failed: {e}", "recommendation": "Check monitoring system and data collection" }) return bottlenecks def generate_optimization_recommendations(self) -> List[Dict[str, Any]]: """Generate optimization recommendations based on performance analysis""" recommendations = [] # Analyze bottlenecks bottlenecks = self.analyze_performance_bottlenecks() for bottleneck in bottlenecks: recommendations.append({ "timestamp": time.time(), "category": bottleneck["category"], "description": bottleneck["description"], "priority": bottleneck["severity"], "impact": "High" if bottleneck["severity"] == "high" else "Medium", "implementation_effort": "Medium", "status": "pending" }) # Add general optimization recommendations general_recommendations = [ { "timestamp": time.time(), "category": "Build Optimization", "description": "Implement build caching to reduce redundant operations", "priority": "medium", "impact": "Medium", "implementation_effort": "Low", "status": "pending" }, { "timestamp": time.time(), "category": "Resource Management", "description": "Implement resource pooling for better utilization", "priority": "medium", "impact": "Medium", "implementation_effort": "Medium", "status": "pending" }, { "timestamp": time.time(), "category": "Monitoring", "description": "Add real-time alerting for performance thresholds", "priority": "low", "impact": "Low", "implementation_effort": "Low", "status": "pending" } ] recommendations.extend(general_recommendations) # Store recommendations self._store_optimization_recommendations(recommendations) return recommendations def _store_optimization_recommendations(self, recommendations: List[Dict[str, Any]]): """Store optimization recommendations in database""" try: conn = sqlite3.connect(self.metrics_db) cursor = conn.cursor() for rec in recommendations: cursor.execute(""" INSERT INTO optimization_recommendations (timestamp, category, description, priority, impact, implementation_effort, status) VALUES (?, ?, ?, ?, ?, ?, ?) """, ( rec["timestamp"], rec["category"], rec["description"], rec["priority"], rec["impact"], rec["implementation_effort"], rec["status"] )) conn.commit() conn.close() except Exception as e: print(f"Failed to store optimization recommendations: {e}") def get_performance_summary(self) -> Dict[str, Any]: """Get comprehensive performance summary""" try: # Get recent metrics recent_metrics = self.get_performance_history(hours=1) if not recent_metrics: return {"error": "No performance data available"} # Calculate averages avg_cpu = sum(m.cpu_usage for m in recent_metrics) / len(recent_metrics) avg_memory = sum(m.memory_usage for m in recent_metrics) / len(recent_metrics) avg_response = sum(m.response_time for m in recent_metrics) / len(recent_metrics) avg_queue = sum(m.queue_length for m in recent_metrics) / len(recent_metrics) # Get bottlenecks bottlenecks = self.analyze_performance_bottlenecks() # Get recommendations recommendations = self.generate_optimization_recommendations() summary = { "timestamp": time.time(), "current_metrics": { "cpu_usage": avg_cpu, "memory_usage": avg_memory, "response_time": avg_response, "queue_length": avg_queue }, "bottlenecks": bottlenecks, "recommendations": recommendations, "status": "healthy" if not bottlenecks else "needs_attention" } return summary except Exception as e: return {"error": f"Failed to generate performance summary: {e}"} def cleanup_old_metrics(self, days: int = 30): """Clean up old performance metrics""" try: conn = sqlite3.connect(self.metrics_db) cursor = conn.cursor() cutoff_time = time.time() - (days * 24 * 3600) # Clean up old performance metrics cursor.execute("DELETE FROM performance_metrics WHERE timestamp < ?", (cutoff_time,)) metrics_deleted = cursor.rowcount # Clean up old load tests cursor.execute("DELETE FROM load_tests WHERE timestamp < ?", (cutoff_time,)) load_tests_deleted = cursor.rowcount conn.commit() conn.close() print(f"Cleaned up {metrics_deleted} old performance metrics and {load_tests_deleted} old load tests") except Exception as e: print(f"Failed to cleanup old metrics: {e}")