debian-forge-composer/production_optimization.py
robojerk 4eeaa43c39
Some checks failed
Tests / 🛃 Unit tests (push) Failing after 13s
Tests / 🗄 DB tests (push) Failing after 19s
Tests / 🐍 Lint python scripts (push) Failing after 1s
Tests / ⌨ Golang Lint (push) Failing after 1s
Tests / 📦 Packit config lint (push) Failing after 1s
Tests / 🔍 Check source preparation (push) Failing after 1s
Tests / 🔍 Check for valid snapshot urls (push) Failing after 1s
Tests / 🔍 Check for missing or unused runner repos (push) Failing after 1s
Tests / 🐚 Shellcheck (push) Failing after 1s
Tests / 📦 RPMlint (push) Failing after 1s
Tests / Gitlab CI trigger helper (push) Failing after 1s
Tests / 🎀 kube-linter (push) Failing after 1s
Tests / 🧹 cloud-cleaner-is-enabled (push) Successful in 3s
Tests / 🔍 Check spec file osbuild/images dependencies (push) Failing after 1s
did stuff
2025-08-26 10:34:42 -07:00

583 lines
21 KiB
Python

#!/usr/bin/env python3
"""
Debian Forge Production Optimization Module
This module provides performance optimization, load testing, and production
monitoring capabilities for the Debian Forge system.
"""
import json
import time
import sqlite3
import threading
from typing import Dict, List, Optional, Any, Tuple
from dataclasses import dataclass
from pathlib import Path
import random
@dataclass
class PerformanceMetrics:
"""Performance metrics for monitoring"""
timestamp: float
cpu_usage: float
memory_usage: float
disk_io: float
network_io: float
active_builds: int
queue_length: int
response_time: float
@dataclass
class LoadTestResult:
"""Result of a load test"""
test_name: str
concurrent_users: int
total_requests: int
successful_requests: int
failed_requests: int
average_response_time: float
max_response_time: float
min_response_time: float
throughput: float # requests per second
error_rate: float
class ProductionOptimization:
"""Production optimization and monitoring for Debian Forge"""
def __init__(self, metrics_db: str = "production_metrics.db"):
self.metrics_db = metrics_db
self._init_metrics_db()
self.monitoring_active = False
self.monitoring_thread = None
def _init_metrics_db(self):
"""Initialize metrics database"""
conn = sqlite3.connect(self.metrics_db)
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS performance_metrics (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp REAL NOT NULL,
cpu_usage REAL NOT NULL,
memory_usage REAL NOT NULL,
disk_io REAL NOT NULL,
network_io REAL NOT NULL,
active_builds INTEGER NOT NULL,
queue_length INTEGER NOT NULL,
response_time REAL NOT NULL
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS load_tests (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp REAL NOT NULL,
test_name TEXT NOT NULL,
concurrent_users INTEGER NOT NULL,
total_requests INTEGER NOT NULL,
successful_requests INTEGER NOT NULL,
failed_requests INTEGER NOT NULL,
average_response_time REAL NOT NULL,
max_response_time REAL NOT NULL,
min_response_time REAL NOT NULL,
throughput REAL NOT NULL,
error_rate REAL NOT NULL
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS optimization_recommendations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp REAL NOT NULL,
category TEXT NOT NULL,
description TEXT NOT NULL,
priority TEXT NOT NULL,
impact TEXT NOT NULL,
implementation_effort TEXT NOT NULL,
status TEXT DEFAULT 'pending'
)
""")
conn.commit()
conn.close()
def start_performance_monitoring(self, interval_seconds: int = 30):
"""Start continuous performance monitoring"""
if self.monitoring_active:
return False
self.monitoring_active = True
self.monitoring_thread = threading.Thread(
target=self._monitoring_loop,
args=(interval_seconds,),
daemon=True
)
self.monitoring_thread.start()
return True
def stop_performance_monitoring(self):
"""Stop performance monitoring"""
self.monitoring_active = False
if self.monitoring_thread:
self.monitoring_thread.join()
def _monitoring_loop(self, interval_seconds: int):
"""Main monitoring loop"""
while self.monitoring_active:
try:
metrics = self._collect_performance_metrics()
self._store_performance_metrics(metrics)
time.sleep(interval_seconds)
except Exception as e:
print(f"Monitoring error: {e}")
time.sleep(interval_seconds)
def _collect_performance_metrics(self) -> PerformanceMetrics:
"""Collect current performance metrics"""
# Simulated metrics for demonstration
# In production, these would come from actual system monitoring
current_time = time.time()
# Simulate CPU usage (0-100%)
cpu_usage = random.uniform(20.0, 80.0)
# Simulate memory usage (0-100%)
memory_usage = random.uniform(30.0, 90.0)
# Simulate disk I/O (MB/s)
disk_io = random.uniform(5.0, 50.0)
# Simulate network I/O (MB/s)
network_io = random.uniform(1.0, 20.0)
# Simulate active builds (0-10)
active_builds = random.randint(0, 10)
# Simulate queue length (0-50)
queue_length = random.randint(0, 50)
# Simulate response time (ms)
response_time = random.uniform(100.0, 2000.0)
return PerformanceMetrics(
timestamp=current_time,
cpu_usage=cpu_usage,
memory_usage=memory_usage,
disk_io=disk_io,
network_io=network_io,
active_builds=active_builds,
queue_length=queue_length,
response_time=response_time
)
def _store_performance_metrics(self, metrics: PerformanceMetrics):
"""Store performance metrics in database"""
try:
conn = sqlite3.connect(self.metrics_db)
cursor = conn.cursor()
cursor.execute("""
INSERT INTO performance_metrics
(timestamp, cpu_usage, memory_usage, disk_io, network_io,
active_builds, queue_length, response_time)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (
metrics.timestamp,
metrics.cpu_usage,
metrics.memory_usage,
metrics.disk_io,
metrics.network_io,
metrics.active_builds,
metrics.queue_length,
metrics.response_time
))
conn.commit()
conn.close()
except Exception as e:
print(f"Failed to store metrics: {e}")
def get_performance_history(self, hours: int = 24) -> List[PerformanceMetrics]:
"""Get performance metrics history"""
try:
conn = sqlite3.connect(self.metrics_db)
cursor = conn.cursor()
cutoff_time = time.time() - (hours * 3600)
cursor.execute("""
SELECT timestamp, cpu_usage, memory_usage, disk_io, network_io,
active_builds, queue_length, response_time
FROM performance_metrics
WHERE timestamp > ?
ORDER BY timestamp DESC
""", (cutoff_time,))
results = []
for row in cursor.fetchall():
metrics = PerformanceMetrics(
timestamp=row[0],
cpu_usage=row[1],
memory_usage=row[2],
disk_io=row[3],
network_io=row[4],
active_builds=row[5],
queue_length=row[6],
response_time=row[7]
)
results.append(metrics)
conn.close()
return results
except Exception as e:
print(f"Failed to retrieve performance history: {e}")
return []
def run_load_test(self, test_name: str, concurrent_users: int,
duration_seconds: int = 300) -> LoadTestResult:
"""Run a load test simulation"""
print(f"🚀 Starting load test: {test_name}")
print(f" Concurrent users: {concurrent_users}")
print(f" Duration: {duration_seconds} seconds")
start_time = time.time()
total_requests = 0
successful_requests = 0
failed_requests = 0
response_times = []
# Simulate load test
while time.time() - start_time < duration_seconds:
# Simulate concurrent user requests
for user in range(concurrent_users):
request_start = time.time()
# Simulate request processing
processing_time = random.uniform(0.1, 2.0)
time.sleep(processing_time)
# Simulate success/failure
success = random.random() > 0.05 # 95% success rate
if success:
successful_requests += 1
else:
failed_requests += 1
response_time = (time.time() - request_start) * 1000 # Convert to ms
response_times.append(response_time)
total_requests += 1
# Small delay between requests
time.sleep(0.01)
# Calculate metrics
if response_times:
average_response_time = sum(response_times) / len(response_times)
max_response_time = max(response_times)
min_response_time = min(response_times)
else:
average_response_time = max_response_time = min_response_time = 0
throughput = total_requests / duration_seconds
error_rate = (failed_requests / total_requests) * 100 if total_requests > 0 else 0
result = LoadTestResult(
test_name=test_name,
concurrent_users=concurrent_users,
total_requests=total_requests,
successful_requests=successful_requests,
failed_requests=failed_requests,
average_response_time=average_response_time,
max_response_time=max_response_time,
min_response_time=min_response_time,
throughput=throughput,
error_rate=error_rate
)
# Store load test result
self._store_load_test_result(result)
return result
def _store_load_test_result(self, result: LoadTestResult):
"""Store load test result in database"""
try:
conn = sqlite3.connect(self.metrics_db)
cursor = conn.cursor()
cursor.execute("""
INSERT INTO load_tests
(timestamp, test_name, concurrent_users, total_requests,
successful_requests, failed_requests, average_response_time,
max_response_time, min_response_time, throughput, error_rate)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
time.time(),
result.test_name,
result.concurrent_users,
result.total_requests,
result.successful_requests,
result.failed_requests,
result.average_response_time,
result.max_response_time,
result.min_response_time,
result.throughput,
result.error_rate
))
conn.commit()
conn.close()
except Exception as e:
print(f"Failed to store load test result: {e}")
def get_load_test_history(self) -> List[LoadTestResult]:
"""Get load test history"""
try:
conn = sqlite3.connect(self.metrics_db)
cursor = conn.cursor()
cursor.execute("""
SELECT timestamp, test_name, concurrent_users, total_requests,
successful_requests, failed_requests, average_response_time,
max_response_time, min_response_time, throughput, error_rate
FROM load_tests
ORDER BY timestamp DESC
""")
results = []
for row in cursor.fetchall():
result = LoadTestResult(
test_name=row[1],
concurrent_users=row[2],
total_requests=row[3],
successful_requests=row[4],
failed_requests=row[5],
average_response_time=row[6],
max_response_time=row[7],
min_response_time=row[8],
throughput=row[9],
error_rate=row[10]
)
results.append(result)
conn.close()
return results
except Exception as e:
print(f"Failed to retrieve load test history: {e}")
return []
def analyze_performance_bottlenecks(self) -> List[Dict[str, Any]]:
"""Analyze performance data for bottlenecks"""
bottlenecks = []
try:
# Get recent performance data
recent_metrics = self.get_performance_history(hours=1)
if not recent_metrics:
return bottlenecks
# Analyze CPU usage
avg_cpu = sum(m.cpu_usage for m in recent_metrics) / len(recent_metrics)
if avg_cpu > 80:
bottlenecks.append({
"category": "CPU",
"severity": "high" if avg_cpu > 90 else "medium",
"description": f"High CPU usage: {avg_cpu:.1f}%",
"recommendation": "Consider scaling CPU resources or optimizing build processes"
})
# Analyze memory usage
avg_memory = sum(m.memory_usage for m in recent_metrics) / len(recent_metrics)
if avg_memory > 85:
bottlenecks.append({
"category": "Memory",
"severity": "high" if avg_memory > 95 else "medium",
"description": f"High memory usage: {avg_memory:.1f}%",
"recommendation": "Consider increasing memory or implementing memory optimization"
})
# Analyze response times
avg_response = sum(m.response_time for m in recent_metrics) / len(recent_metrics)
if avg_response > 1000: # > 1 second
bottlenecks.append({
"category": "Response Time",
"severity": "high" if avg_response > 2000 else "medium",
"description": f"Slow response time: {avg_response:.1f}ms",
"recommendation": "Investigate slow operations and optimize critical paths"
})
# Analyze queue length
avg_queue = sum(m.queue_length for m in recent_metrics) / len(recent_metrics)
if avg_queue > 20:
bottlenecks.append({
"category": "Queue",
"severity": "high" if avg_queue > 40 else "medium",
"description": f"Long build queue: {avg_queue:.1f} builds",
"recommendation": "Consider adding more build workers or optimizing build times"
})
except Exception as e:
bottlenecks.append({
"category": "Analysis",
"severity": "medium",
"description": f"Performance analysis failed: {e}",
"recommendation": "Check monitoring system and data collection"
})
return bottlenecks
def generate_optimization_recommendations(self) -> List[Dict[str, Any]]:
"""Generate optimization recommendations based on performance analysis"""
recommendations = []
# Analyze bottlenecks
bottlenecks = self.analyze_performance_bottlenecks()
for bottleneck in bottlenecks:
recommendations.append({
"timestamp": time.time(),
"category": bottleneck["category"],
"description": bottleneck["description"],
"priority": bottleneck["severity"],
"impact": "High" if bottleneck["severity"] == "high" else "Medium",
"implementation_effort": "Medium",
"status": "pending"
})
# Add general optimization recommendations
general_recommendations = [
{
"timestamp": time.time(),
"category": "Build Optimization",
"description": "Implement build caching to reduce redundant operations",
"priority": "medium",
"impact": "Medium",
"implementation_effort": "Low",
"status": "pending"
},
{
"timestamp": time.time(),
"category": "Resource Management",
"description": "Implement resource pooling for better utilization",
"priority": "medium",
"impact": "Medium",
"implementation_effort": "Medium",
"status": "pending"
},
{
"timestamp": time.time(),
"category": "Monitoring",
"description": "Add real-time alerting for performance thresholds",
"priority": "low",
"impact": "Low",
"implementation_effort": "Low",
"status": "pending"
}
]
recommendations.extend(general_recommendations)
# Store recommendations
self._store_optimization_recommendations(recommendations)
return recommendations
def _store_optimization_recommendations(self, recommendations: List[Dict[str, Any]]):
"""Store optimization recommendations in database"""
try:
conn = sqlite3.connect(self.metrics_db)
cursor = conn.cursor()
for rec in recommendations:
cursor.execute("""
INSERT INTO optimization_recommendations
(timestamp, category, description, priority, impact,
implementation_effort, status)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (
rec["timestamp"],
rec["category"],
rec["description"],
rec["priority"],
rec["impact"],
rec["implementation_effort"],
rec["status"]
))
conn.commit()
conn.close()
except Exception as e:
print(f"Failed to store optimization recommendations: {e}")
def get_performance_summary(self) -> Dict[str, Any]:
"""Get comprehensive performance summary"""
try:
# Get recent metrics
recent_metrics = self.get_performance_history(hours=1)
if not recent_metrics:
return {"error": "No performance data available"}
# Calculate averages
avg_cpu = sum(m.cpu_usage for m in recent_metrics) / len(recent_metrics)
avg_memory = sum(m.memory_usage for m in recent_metrics) / len(recent_metrics)
avg_response = sum(m.response_time for m in recent_metrics) / len(recent_metrics)
avg_queue = sum(m.queue_length for m in recent_metrics) / len(recent_metrics)
# Get bottlenecks
bottlenecks = self.analyze_performance_bottlenecks()
# Get recommendations
recommendations = self.generate_optimization_recommendations()
summary = {
"timestamp": time.time(),
"current_metrics": {
"cpu_usage": avg_cpu,
"memory_usage": avg_memory,
"response_time": avg_response,
"queue_length": avg_queue
},
"bottlenecks": bottlenecks,
"recommendations": recommendations,
"status": "healthy" if not bottlenecks else "needs_attention"
}
return summary
except Exception as e:
return {"error": f"Failed to generate performance summary: {e}"}
def cleanup_old_metrics(self, days: int = 30):
"""Clean up old performance metrics"""
try:
conn = sqlite3.connect(self.metrics_db)
cursor = conn.cursor()
cutoff_time = time.time() - (days * 24 * 3600)
# Clean up old performance metrics
cursor.execute("DELETE FROM performance_metrics WHERE timestamp < ?", (cutoff_time,))
metrics_deleted = cursor.rowcount
# Clean up old load tests
cursor.execute("DELETE FROM load_tests WHERE timestamp < ?", (cutoff_time,))
load_tests_deleted = cursor.rowcount
conn.commit()
conn.close()
print(f"Cleaned up {metrics_deleted} old performance metrics and {load_tests_deleted} old load tests")
except Exception as e:
print(f"Failed to cleanup old metrics: {e}")