#!/usr/bin/env python3 """ Debian Forge System Administration Interface (Simplified) This module provides system administration functionality without external dependencies: - System configuration management - Build monitoring and health checks - Resource usage tracking (simulated) - System maintenance tools """ import json import os import sqlite3 import time from typing import Dict, List, Optional, Any from dataclasses import dataclass from pathlib import Path @dataclass class SystemStatus: """System status information""" status: str cpu_percent: float memory_percent: float disk_percent: float active_builds: int queued_builds: int total_builds: int uptime: str last_updated: str @dataclass class BuildStatistics: """Build statistics and metrics""" total_builds: int successful_builds: int failed_builds: int average_build_time: float builds_per_day: float most_used_blueprint: str resource_usage_trend: Dict[str, float] @dataclass class SystemConfiguration: """System configuration settings""" max_concurrent_builds: int resource_limits: Dict[str, int] cleanup_policies: Dict[str, Any] security_settings: Dict[str, Any] notification_settings: Dict[str, Any] class AdminInterfaceSimple: """Simplified system administration interface for Debian Forge""" def __init__(self, db_path: str = "admin.db"): self.db_path = db_path self._init_database() def _init_database(self): """Initialize the admin database""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # Create system configuration table cursor.execute(""" CREATE TABLE IF NOT EXISTS system_config ( key TEXT PRIMARY KEY, value TEXT NOT NULL, updated_at TEXT NOT NULL ) """) # Create system logs table cursor.execute(""" CREATE TABLE IF NOT EXISTS system_logs ( id INTEGER PRIMARY KEY AUTOINCREMENT, level TEXT NOT NULL, message TEXT NOT NULL, component TEXT NOT NULL, timestamp TEXT NOT NULL ) """) # Create maintenance tasks table cursor.execute(""" CREATE TABLE IF NOT EXISTS maintenance_tasks ( id INTEGER PRIMARY KEY AUTOINCREMENT, task_name TEXT NOT NULL, status TEXT NOT NULL, scheduled_at TEXT NOT NULL, completed_at TEXT, result TEXT ) """) # Insert default configuration default_config = { "max_concurrent_builds": "4", "cpu_limit": "80", "memory_limit": "85", "disk_limit": "90", "cleanup_interval": "3600", "log_retention_days": "30", "backup_interval": "86400" } for key, value in default_config.items(): cursor.execute(""" INSERT OR IGNORE INTO system_config (key, value, updated_at) VALUES (?, ?, ?) """, (key, value, time.strftime("%Y-%m-%d %H:%M:%S"))) conn.commit() conn.close() def get_system_status(self) -> SystemStatus: """Get current system status (with simulated resource data)""" import random # Simulate resource usage cpu_percent = random.uniform(20, 80) memory_percent = random.uniform(30, 85) disk_percent = random.uniform(60, 75) # Get build statistics (simulated for now) active_builds = self._get_active_builds_count() queued_builds = self._get_queued_builds_count() total_builds = self._get_total_builds_count() # Simulate system uptime uptime = "2d 14h 37m" # Determine overall status status = "healthy" if cpu_percent > 90 or memory_percent > 90 or disk_percent > 90: status = "warning" if cpu_percent > 95 or memory_percent > 95 or disk_percent > 95: status = "critical" return SystemStatus( status=status, cpu_percent=cpu_percent, memory_percent=memory_percent, disk_percent=disk_percent, active_builds=active_builds, queued_builds=queued_builds, total_builds=total_builds, uptime=uptime, last_updated=time.strftime("%Y-%m-%d %H:%M:%S") ) def _get_active_builds_count(self) -> int: """Get number of active builds (simulated)""" # In real implementation, this would query the build orchestrator return 2 def _get_queued_builds_count(self) -> int: """Get number of queued builds (simulated)""" # In real implementation, this would query the build queue return 1 def _get_total_builds_count(self) -> int: """Get total number of builds (simulated)""" # In real implementation, this would query the build history return 47 def get_build_statistics(self) -> BuildStatistics: """Get build statistics and metrics""" # Simulated statistics - in real implementation, query build database return BuildStatistics( total_builds=47, successful_builds=42, failed_builds=5, average_build_time=1847.5, # seconds builds_per_day=12.3, most_used_blueprint="debian-atomic-base", resource_usage_trend={ "cpu_avg": 65.2, "memory_avg": 78.4, "disk_growth": 2.1 } ) def get_system_configuration(self) -> SystemConfiguration: """Get system configuration""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute("SELECT key, value FROM system_config") config_data = dict(cursor.fetchall()) conn.close() return SystemConfiguration( max_concurrent_builds=int(config_data.get("max_concurrent_builds", "4")), resource_limits={ "cpu": int(config_data.get("cpu_limit", "80")), "memory": int(config_data.get("memory_limit", "85")), "disk": int(config_data.get("disk_limit", "90")) }, cleanup_policies={ "interval": int(config_data.get("cleanup_interval", "3600")), "log_retention_days": int(config_data.get("log_retention_days", "30")) }, security_settings={ "require_authentication": True, "session_timeout": 3600, "password_complexity": True }, notification_settings={ "email_alerts": False, "slack_integration": False, "webhook_url": None } ) def update_system_configuration(self, config: SystemConfiguration) -> bool: """Update system configuration""" try: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() timestamp = time.strftime("%Y-%m-%d %H:%M:%S") # Update configuration values config_updates = [ ("max_concurrent_builds", str(config.max_concurrent_builds)), ("cpu_limit", str(config.resource_limits["cpu"])), ("memory_limit", str(config.resource_limits["memory"])), ("disk_limit", str(config.resource_limits["disk"])), ("cleanup_interval", str(config.cleanup_policies["interval"])), ("log_retention_days", str(config.cleanup_policies["log_retention_days"])) ] for key, value in config_updates: cursor.execute(""" INSERT OR REPLACE INTO system_config (key, value, updated_at) VALUES (?, ?, ?) """, (key, value, timestamp)) conn.commit() conn.close() # Log configuration change self.log_event("INFO", "System configuration updated", "admin_interface") return True except Exception as e: self.log_event("ERROR", f"Failed to update configuration: {e}", "admin_interface") return False def log_event(self, level: str, message: str, component: str): """Log system event""" try: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(""" INSERT INTO system_logs (level, message, component, timestamp) VALUES (?, ?, ?, ?) """, (level, message, component, time.strftime("%Y-%m-%d %H:%M:%S"))) conn.commit() conn.close() except Exception as e: print(f"Failed to log event: {e}") def get_system_logs(self, limit: int = 100, level: Optional[str] = None) -> List[Dict[str, Any]]: """Get system logs""" try: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() if level: cursor.execute(""" SELECT level, message, component, timestamp FROM system_logs WHERE level = ? ORDER BY timestamp DESC LIMIT ? """, (level, limit)) else: cursor.execute(""" SELECT level, message, component, timestamp FROM system_logs ORDER BY timestamp DESC LIMIT ? """, (limit,)) logs = [] for row in cursor.fetchall(): logs.append({ "level": row[0], "message": row[1], "component": row[2], "timestamp": row[3] }) conn.close() return logs except Exception as e: self.log_event("ERROR", f"Failed to retrieve logs: {e}", "admin_interface") return [] def schedule_maintenance_task(self, task_name: str, scheduled_time: str) -> bool: """Schedule a maintenance task""" try: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(""" INSERT INTO maintenance_tasks (task_name, status, scheduled_at) VALUES (?, ?, ?) """, (task_name, "scheduled", scheduled_time)) conn.commit() conn.close() self.log_event("INFO", f"Maintenance task scheduled: {task_name}", "admin_interface") return True except Exception as e: self.log_event("ERROR", f"Failed to schedule task: {e}", "admin_interface") return False def get_maintenance_tasks(self) -> List[Dict[str, Any]]: """Get maintenance tasks""" try: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(""" SELECT task_name, status, scheduled_at, completed_at, result FROM maintenance_tasks ORDER BY scheduled_at DESC """) tasks = [] for row in cursor.fetchall(): tasks.append({ "task_name": row[0], "status": row[1], "scheduled_at": row[2], "completed_at": row[3], "result": row[4] }) conn.close() return tasks except Exception as e: self.log_event("ERROR", f"Failed to retrieve tasks: {e}", "admin_interface") return [] def run_system_cleanup(self) -> Dict[str, Any]: """Run system cleanup tasks""" results = { "status": "success", "tasks_completed": [], "tasks_failed": [], "cleanup_summary": {} } try: # Cleanup old logs self._cleanup_old_logs() results["tasks_completed"].append("log_cleanup") # Cleanup temporary files temp_cleaned = self._cleanup_temp_files() results["tasks_completed"].append("temp_cleanup") results["cleanup_summary"]["temp_files_removed"] = temp_cleaned # Cleanup old build artifacts (simulated) artifacts_cleaned = self._cleanup_old_artifacts() results["tasks_completed"].append("artifact_cleanup") results["cleanup_summary"]["artifacts_removed"] = artifacts_cleaned self.log_event("INFO", "System cleanup completed successfully", "admin_interface") except Exception as e: results["status"] = "partial_failure" results["tasks_failed"].append(f"cleanup_error: {e}") self.log_event("ERROR", f"System cleanup failed: {e}", "admin_interface") return results def _cleanup_old_logs(self): """Cleanup old log entries""" config = self.get_system_configuration() retention_days = config.cleanup_policies["log_retention_days"] conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cutoff_date = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time() - (retention_days * 86400))) cursor.execute("DELETE FROM system_logs WHERE timestamp < ?", (cutoff_date,)) conn.commit() conn.close() def _cleanup_temp_files(self) -> int: """Cleanup temporary files""" temp_dirs = ["/tmp", "/var/tmp"] files_removed = 0 for temp_dir in temp_dirs: if os.path.exists(temp_dir): for filename in os.listdir(temp_dir): if filename.startswith("debian-forge-") or filename.startswith("osbuild-"): filepath = os.path.join(temp_dir, filename) try: if os.path.isfile(filepath): os.remove(filepath) files_removed += 1 elif os.path.isdir(filepath): import shutil shutil.rmtree(filepath) files_removed += 1 except Exception: pass # Ignore errors for individual files return files_removed def _cleanup_old_artifacts(self) -> int: """Cleanup old build artifacts (simulated)""" # In real implementation, this would cleanup old build artifacts # based on configured retention policies return 15 # Simulated number of artifacts removed def get_resource_usage_history(self, hours: int = 24) -> Dict[str, List[float]]: """Get resource usage history (simulated data)""" # In real implementation, this would query stored metrics import random data_points = hours * 6 # Every 10 minutes return { "timestamps": [i * 10 for i in range(data_points)], # Minutes ago "cpu_usage": [random.uniform(20, 80) for _ in range(data_points)], "memory_usage": [random.uniform(30, 85) for _ in range(data_points)], "disk_usage": [random.uniform(60, 75) for _ in range(data_points)] } def restart_services(self, services: List[str]) -> Dict[str, bool]: """Restart system services (simulated)""" results = {} for service in services: try: # In real implementation, this would use systemctl or similar self.log_event("INFO", f"Restarting service: {service}", "admin_interface") results[service] = True except Exception as e: self.log_event("ERROR", f"Failed to restart {service}: {e}", "admin_interface") results[service] = False return results def get_admin_dashboard_data(self) -> Dict[str, Any]: """Get comprehensive dashboard data for admin interface""" return { "system_status": self.get_system_status().__dict__, "build_statistics": self.get_build_statistics().__dict__, "system_configuration": self.get_system_configuration().__dict__, "recent_logs": self.get_system_logs(limit=10), "maintenance_tasks": self.get_maintenance_tasks(), "resource_history": self.get_resource_usage_history(hours=1) }