Some checks failed
Tests / 🛃 Unit tests (push) Failing after 13s
Tests / 🗄 DB tests (push) Failing after 19s
Tests / 🐍 Lint python scripts (push) Failing after 1s
Tests / ⌨ Golang Lint (push) Failing after 1s
Tests / 📦 Packit config lint (push) Failing after 1s
Tests / 🔍 Check source preparation (push) Failing after 1s
Tests / 🔍 Check for valid snapshot urls (push) Failing after 1s
Tests / 🔍 Check for missing or unused runner repos (push) Failing after 1s
Tests / 🐚 Shellcheck (push) Failing after 1s
Tests / 📦 RPMlint (push) Failing after 1s
Tests / Gitlab CI trigger helper (push) Failing after 1s
Tests / 🎀 kube-linter (push) Failing after 1s
Tests / 🧹 cloud-cleaner-is-enabled (push) Successful in 3s
Tests / 🔍 Check spec file osbuild/images dependencies (push) Failing after 1s
583 lines
21 KiB
Python
583 lines
21 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Debian Forge Production Optimization Module
|
|
|
|
This module provides performance optimization, load testing, and production
|
|
monitoring capabilities for the Debian Forge system.
|
|
"""
|
|
|
|
import json
|
|
import time
|
|
import sqlite3
|
|
import threading
|
|
from typing import Dict, List, Optional, Any, Tuple
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
import random
|
|
|
|
@dataclass
|
|
class PerformanceMetrics:
|
|
"""Performance metrics for monitoring"""
|
|
timestamp: float
|
|
cpu_usage: float
|
|
memory_usage: float
|
|
disk_io: float
|
|
network_io: float
|
|
active_builds: int
|
|
queue_length: int
|
|
response_time: float
|
|
|
|
@dataclass
|
|
class LoadTestResult:
|
|
"""Result of a load test"""
|
|
test_name: str
|
|
concurrent_users: int
|
|
total_requests: int
|
|
successful_requests: int
|
|
failed_requests: int
|
|
average_response_time: float
|
|
max_response_time: float
|
|
min_response_time: float
|
|
throughput: float # requests per second
|
|
error_rate: float
|
|
|
|
class ProductionOptimization:
|
|
"""Production optimization and monitoring for Debian Forge"""
|
|
|
|
def __init__(self, metrics_db: str = "production_metrics.db"):
|
|
self.metrics_db = metrics_db
|
|
self._init_metrics_db()
|
|
self.monitoring_active = False
|
|
self.monitoring_thread = None
|
|
|
|
def _init_metrics_db(self):
|
|
"""Initialize metrics database"""
|
|
conn = sqlite3.connect(self.metrics_db)
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS performance_metrics (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
timestamp REAL NOT NULL,
|
|
cpu_usage REAL NOT NULL,
|
|
memory_usage REAL NOT NULL,
|
|
disk_io REAL NOT NULL,
|
|
network_io REAL NOT NULL,
|
|
active_builds INTEGER NOT NULL,
|
|
queue_length INTEGER NOT NULL,
|
|
response_time REAL NOT NULL
|
|
)
|
|
""")
|
|
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS load_tests (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
timestamp REAL NOT NULL,
|
|
test_name TEXT NOT NULL,
|
|
concurrent_users INTEGER NOT NULL,
|
|
total_requests INTEGER NOT NULL,
|
|
successful_requests INTEGER NOT NULL,
|
|
failed_requests INTEGER NOT NULL,
|
|
average_response_time REAL NOT NULL,
|
|
max_response_time REAL NOT NULL,
|
|
min_response_time REAL NOT NULL,
|
|
throughput REAL NOT NULL,
|
|
error_rate REAL NOT NULL
|
|
)
|
|
""")
|
|
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS optimization_recommendations (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
timestamp REAL NOT NULL,
|
|
category TEXT NOT NULL,
|
|
description TEXT NOT NULL,
|
|
priority TEXT NOT NULL,
|
|
impact TEXT NOT NULL,
|
|
implementation_effort TEXT NOT NULL,
|
|
status TEXT DEFAULT 'pending'
|
|
)
|
|
""")
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
def start_performance_monitoring(self, interval_seconds: int = 30):
|
|
"""Start continuous performance monitoring"""
|
|
if self.monitoring_active:
|
|
return False
|
|
|
|
self.monitoring_active = True
|
|
self.monitoring_thread = threading.Thread(
|
|
target=self._monitoring_loop,
|
|
args=(interval_seconds,),
|
|
daemon=True
|
|
)
|
|
self.monitoring_thread.start()
|
|
return True
|
|
|
|
def stop_performance_monitoring(self):
|
|
"""Stop performance monitoring"""
|
|
self.monitoring_active = False
|
|
if self.monitoring_thread:
|
|
self.monitoring_thread.join()
|
|
|
|
def _monitoring_loop(self, interval_seconds: int):
|
|
"""Main monitoring loop"""
|
|
while self.monitoring_active:
|
|
try:
|
|
metrics = self._collect_performance_metrics()
|
|
self._store_performance_metrics(metrics)
|
|
time.sleep(interval_seconds)
|
|
except Exception as e:
|
|
print(f"Monitoring error: {e}")
|
|
time.sleep(interval_seconds)
|
|
|
|
def _collect_performance_metrics(self) -> PerformanceMetrics:
|
|
"""Collect current performance metrics"""
|
|
# Simulated metrics for demonstration
|
|
# In production, these would come from actual system monitoring
|
|
|
|
current_time = time.time()
|
|
|
|
# Simulate CPU usage (0-100%)
|
|
cpu_usage = random.uniform(20.0, 80.0)
|
|
|
|
# Simulate memory usage (0-100%)
|
|
memory_usage = random.uniform(30.0, 90.0)
|
|
|
|
# Simulate disk I/O (MB/s)
|
|
disk_io = random.uniform(5.0, 50.0)
|
|
|
|
# Simulate network I/O (MB/s)
|
|
network_io = random.uniform(1.0, 20.0)
|
|
|
|
# Simulate active builds (0-10)
|
|
active_builds = random.randint(0, 10)
|
|
|
|
# Simulate queue length (0-50)
|
|
queue_length = random.randint(0, 50)
|
|
|
|
# Simulate response time (ms)
|
|
response_time = random.uniform(100.0, 2000.0)
|
|
|
|
return PerformanceMetrics(
|
|
timestamp=current_time,
|
|
cpu_usage=cpu_usage,
|
|
memory_usage=memory_usage,
|
|
disk_io=disk_io,
|
|
network_io=network_io,
|
|
active_builds=active_builds,
|
|
queue_length=queue_length,
|
|
response_time=response_time
|
|
)
|
|
|
|
def _store_performance_metrics(self, metrics: PerformanceMetrics):
|
|
"""Store performance metrics in database"""
|
|
try:
|
|
conn = sqlite3.connect(self.metrics_db)
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
INSERT INTO performance_metrics
|
|
(timestamp, cpu_usage, memory_usage, disk_io, network_io,
|
|
active_builds, queue_length, response_time)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
|
""", (
|
|
metrics.timestamp,
|
|
metrics.cpu_usage,
|
|
metrics.memory_usage,
|
|
metrics.disk_io,
|
|
metrics.network_io,
|
|
metrics.active_builds,
|
|
metrics.queue_length,
|
|
metrics.response_time
|
|
))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
except Exception as e:
|
|
print(f"Failed to store metrics: {e}")
|
|
|
|
def get_performance_history(self, hours: int = 24) -> List[PerformanceMetrics]:
|
|
"""Get performance metrics history"""
|
|
try:
|
|
conn = sqlite3.connect(self.metrics_db)
|
|
cursor = conn.cursor()
|
|
|
|
cutoff_time = time.time() - (hours * 3600)
|
|
|
|
cursor.execute("""
|
|
SELECT timestamp, cpu_usage, memory_usage, disk_io, network_io,
|
|
active_builds, queue_length, response_time
|
|
FROM performance_metrics
|
|
WHERE timestamp > ?
|
|
ORDER BY timestamp DESC
|
|
""", (cutoff_time,))
|
|
|
|
results = []
|
|
for row in cursor.fetchall():
|
|
metrics = PerformanceMetrics(
|
|
timestamp=row[0],
|
|
cpu_usage=row[1],
|
|
memory_usage=row[2],
|
|
disk_io=row[3],
|
|
network_io=row[4],
|
|
active_builds=row[5],
|
|
queue_length=row[6],
|
|
response_time=row[7]
|
|
)
|
|
results.append(metrics)
|
|
|
|
conn.close()
|
|
return results
|
|
|
|
except Exception as e:
|
|
print(f"Failed to retrieve performance history: {e}")
|
|
return []
|
|
|
|
def run_load_test(self, test_name: str, concurrent_users: int,
|
|
duration_seconds: int = 300) -> LoadTestResult:
|
|
"""Run a load test simulation"""
|
|
|
|
print(f"🚀 Starting load test: {test_name}")
|
|
print(f" Concurrent users: {concurrent_users}")
|
|
print(f" Duration: {duration_seconds} seconds")
|
|
|
|
start_time = time.time()
|
|
total_requests = 0
|
|
successful_requests = 0
|
|
failed_requests = 0
|
|
response_times = []
|
|
|
|
# Simulate load test
|
|
while time.time() - start_time < duration_seconds:
|
|
# Simulate concurrent user requests
|
|
for user in range(concurrent_users):
|
|
request_start = time.time()
|
|
|
|
# Simulate request processing
|
|
processing_time = random.uniform(0.1, 2.0)
|
|
time.sleep(processing_time)
|
|
|
|
# Simulate success/failure
|
|
success = random.random() > 0.05 # 95% success rate
|
|
|
|
if success:
|
|
successful_requests += 1
|
|
else:
|
|
failed_requests += 1
|
|
|
|
response_time = (time.time() - request_start) * 1000 # Convert to ms
|
|
response_times.append(response_time)
|
|
total_requests += 1
|
|
|
|
# Small delay between requests
|
|
time.sleep(0.01)
|
|
|
|
# Calculate metrics
|
|
if response_times:
|
|
average_response_time = sum(response_times) / len(response_times)
|
|
max_response_time = max(response_times)
|
|
min_response_time = min(response_times)
|
|
else:
|
|
average_response_time = max_response_time = min_response_time = 0
|
|
|
|
throughput = total_requests / duration_seconds
|
|
error_rate = (failed_requests / total_requests) * 100 if total_requests > 0 else 0
|
|
|
|
result = LoadTestResult(
|
|
test_name=test_name,
|
|
concurrent_users=concurrent_users,
|
|
total_requests=total_requests,
|
|
successful_requests=successful_requests,
|
|
failed_requests=failed_requests,
|
|
average_response_time=average_response_time,
|
|
max_response_time=max_response_time,
|
|
min_response_time=min_response_time,
|
|
throughput=throughput,
|
|
error_rate=error_rate
|
|
)
|
|
|
|
# Store load test result
|
|
self._store_load_test_result(result)
|
|
|
|
return result
|
|
|
|
def _store_load_test_result(self, result: LoadTestResult):
|
|
"""Store load test result in database"""
|
|
try:
|
|
conn = sqlite3.connect(self.metrics_db)
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
INSERT INTO load_tests
|
|
(timestamp, test_name, concurrent_users, total_requests,
|
|
successful_requests, failed_requests, average_response_time,
|
|
max_response_time, min_response_time, throughput, error_rate)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""", (
|
|
time.time(),
|
|
result.test_name,
|
|
result.concurrent_users,
|
|
result.total_requests,
|
|
result.successful_requests,
|
|
result.failed_requests,
|
|
result.average_response_time,
|
|
result.max_response_time,
|
|
result.min_response_time,
|
|
result.throughput,
|
|
result.error_rate
|
|
))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
except Exception as e:
|
|
print(f"Failed to store load test result: {e}")
|
|
|
|
def get_load_test_history(self) -> List[LoadTestResult]:
|
|
"""Get load test history"""
|
|
try:
|
|
conn = sqlite3.connect(self.metrics_db)
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
SELECT timestamp, test_name, concurrent_users, total_requests,
|
|
successful_requests, failed_requests, average_response_time,
|
|
max_response_time, min_response_time, throughput, error_rate
|
|
FROM load_tests
|
|
ORDER BY timestamp DESC
|
|
""")
|
|
|
|
results = []
|
|
for row in cursor.fetchall():
|
|
result = LoadTestResult(
|
|
test_name=row[1],
|
|
concurrent_users=row[2],
|
|
total_requests=row[3],
|
|
successful_requests=row[4],
|
|
failed_requests=row[5],
|
|
average_response_time=row[6],
|
|
max_response_time=row[7],
|
|
min_response_time=row[8],
|
|
throughput=row[9],
|
|
error_rate=row[10]
|
|
)
|
|
results.append(result)
|
|
|
|
conn.close()
|
|
return results
|
|
|
|
except Exception as e:
|
|
print(f"Failed to retrieve load test history: {e}")
|
|
return []
|
|
|
|
def analyze_performance_bottlenecks(self) -> List[Dict[str, Any]]:
|
|
"""Analyze performance data for bottlenecks"""
|
|
bottlenecks = []
|
|
|
|
try:
|
|
# Get recent performance data
|
|
recent_metrics = self.get_performance_history(hours=1)
|
|
|
|
if not recent_metrics:
|
|
return bottlenecks
|
|
|
|
# Analyze CPU usage
|
|
avg_cpu = sum(m.cpu_usage for m in recent_metrics) / len(recent_metrics)
|
|
if avg_cpu > 80:
|
|
bottlenecks.append({
|
|
"category": "CPU",
|
|
"severity": "high" if avg_cpu > 90 else "medium",
|
|
"description": f"High CPU usage: {avg_cpu:.1f}%",
|
|
"recommendation": "Consider scaling CPU resources or optimizing build processes"
|
|
})
|
|
|
|
# Analyze memory usage
|
|
avg_memory = sum(m.memory_usage for m in recent_metrics) / len(recent_metrics)
|
|
if avg_memory > 85:
|
|
bottlenecks.append({
|
|
"category": "Memory",
|
|
"severity": "high" if avg_memory > 95 else "medium",
|
|
"description": f"High memory usage: {avg_memory:.1f}%",
|
|
"recommendation": "Consider increasing memory or implementing memory optimization"
|
|
})
|
|
|
|
# Analyze response times
|
|
avg_response = sum(m.response_time for m in recent_metrics) / len(recent_metrics)
|
|
if avg_response > 1000: # > 1 second
|
|
bottlenecks.append({
|
|
"category": "Response Time",
|
|
"severity": "high" if avg_response > 2000 else "medium",
|
|
"description": f"Slow response time: {avg_response:.1f}ms",
|
|
"recommendation": "Investigate slow operations and optimize critical paths"
|
|
})
|
|
|
|
# Analyze queue length
|
|
avg_queue = sum(m.queue_length for m in recent_metrics) / len(recent_metrics)
|
|
if avg_queue > 20:
|
|
bottlenecks.append({
|
|
"category": "Queue",
|
|
"severity": "high" if avg_queue > 40 else "medium",
|
|
"description": f"Long build queue: {avg_queue:.1f} builds",
|
|
"recommendation": "Consider adding more build workers or optimizing build times"
|
|
})
|
|
|
|
except Exception as e:
|
|
bottlenecks.append({
|
|
"category": "Analysis",
|
|
"severity": "medium",
|
|
"description": f"Performance analysis failed: {e}",
|
|
"recommendation": "Check monitoring system and data collection"
|
|
})
|
|
|
|
return bottlenecks
|
|
|
|
def generate_optimization_recommendations(self) -> List[Dict[str, Any]]:
|
|
"""Generate optimization recommendations based on performance analysis"""
|
|
recommendations = []
|
|
|
|
# Analyze bottlenecks
|
|
bottlenecks = self.analyze_performance_bottlenecks()
|
|
|
|
for bottleneck in bottlenecks:
|
|
recommendations.append({
|
|
"timestamp": time.time(),
|
|
"category": bottleneck["category"],
|
|
"description": bottleneck["description"],
|
|
"priority": bottleneck["severity"],
|
|
"impact": "High" if bottleneck["severity"] == "high" else "Medium",
|
|
"implementation_effort": "Medium",
|
|
"status": "pending"
|
|
})
|
|
|
|
# Add general optimization recommendations
|
|
general_recommendations = [
|
|
{
|
|
"timestamp": time.time(),
|
|
"category": "Build Optimization",
|
|
"description": "Implement build caching to reduce redundant operations",
|
|
"priority": "medium",
|
|
"impact": "Medium",
|
|
"implementation_effort": "Low",
|
|
"status": "pending"
|
|
},
|
|
{
|
|
"timestamp": time.time(),
|
|
"category": "Resource Management",
|
|
"description": "Implement resource pooling for better utilization",
|
|
"priority": "medium",
|
|
"impact": "Medium",
|
|
"implementation_effort": "Medium",
|
|
"status": "pending"
|
|
},
|
|
{
|
|
"timestamp": time.time(),
|
|
"category": "Monitoring",
|
|
"description": "Add real-time alerting for performance thresholds",
|
|
"priority": "low",
|
|
"impact": "Low",
|
|
"implementation_effort": "Low",
|
|
"status": "pending"
|
|
}
|
|
]
|
|
|
|
recommendations.extend(general_recommendations)
|
|
|
|
# Store recommendations
|
|
self._store_optimization_recommendations(recommendations)
|
|
|
|
return recommendations
|
|
|
|
def _store_optimization_recommendations(self, recommendations: List[Dict[str, Any]]):
|
|
"""Store optimization recommendations in database"""
|
|
try:
|
|
conn = sqlite3.connect(self.metrics_db)
|
|
cursor = conn.cursor()
|
|
|
|
for rec in recommendations:
|
|
cursor.execute("""
|
|
INSERT INTO optimization_recommendations
|
|
(timestamp, category, description, priority, impact,
|
|
implementation_effort, status)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?)
|
|
""", (
|
|
rec["timestamp"],
|
|
rec["category"],
|
|
rec["description"],
|
|
rec["priority"],
|
|
rec["impact"],
|
|
rec["implementation_effort"],
|
|
rec["status"]
|
|
))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
except Exception as e:
|
|
print(f"Failed to store optimization recommendations: {e}")
|
|
|
|
def get_performance_summary(self) -> Dict[str, Any]:
|
|
"""Get comprehensive performance summary"""
|
|
try:
|
|
# Get recent metrics
|
|
recent_metrics = self.get_performance_history(hours=1)
|
|
|
|
if not recent_metrics:
|
|
return {"error": "No performance data available"}
|
|
|
|
# Calculate averages
|
|
avg_cpu = sum(m.cpu_usage for m in recent_metrics) / len(recent_metrics)
|
|
avg_memory = sum(m.memory_usage for m in recent_metrics) / len(recent_metrics)
|
|
avg_response = sum(m.response_time for m in recent_metrics) / len(recent_metrics)
|
|
avg_queue = sum(m.queue_length for m in recent_metrics) / len(recent_metrics)
|
|
|
|
# Get bottlenecks
|
|
bottlenecks = self.analyze_performance_bottlenecks()
|
|
|
|
# Get recommendations
|
|
recommendations = self.generate_optimization_recommendations()
|
|
|
|
summary = {
|
|
"timestamp": time.time(),
|
|
"current_metrics": {
|
|
"cpu_usage": avg_cpu,
|
|
"memory_usage": avg_memory,
|
|
"response_time": avg_response,
|
|
"queue_length": avg_queue
|
|
},
|
|
"bottlenecks": bottlenecks,
|
|
"recommendations": recommendations,
|
|
"status": "healthy" if not bottlenecks else "needs_attention"
|
|
}
|
|
|
|
return summary
|
|
|
|
except Exception as e:
|
|
return {"error": f"Failed to generate performance summary: {e}"}
|
|
|
|
def cleanup_old_metrics(self, days: int = 30):
|
|
"""Clean up old performance metrics"""
|
|
try:
|
|
conn = sqlite3.connect(self.metrics_db)
|
|
cursor = conn.cursor()
|
|
|
|
cutoff_time = time.time() - (days * 24 * 3600)
|
|
|
|
# Clean up old performance metrics
|
|
cursor.execute("DELETE FROM performance_metrics WHERE timestamp < ?", (cutoff_time,))
|
|
metrics_deleted = cursor.rowcount
|
|
|
|
# Clean up old load tests
|
|
cursor.execute("DELETE FROM load_tests WHERE timestamp < ?", (cutoff_time,))
|
|
load_tests_deleted = cursor.rowcount
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
print(f"Cleaned up {metrics_deleted} old performance metrics and {load_tests_deleted} old load tests")
|
|
|
|
except Exception as e:
|
|
print(f"Failed to cleanup old metrics: {e}")
|