#!/usr/bin/env python3 """ Composer Status Monitor for Debian Forge This module provides real-time monitoring of composer build status, progress tracking, and status notifications. """ import json import time import threading from typing import Dict, List, Optional, Callable from dataclasses import dataclass, asdict from datetime import datetime, timedelta from pathlib import Path @dataclass class BuildProgress: """Represents build progress information""" stage: str progress: float # 0.0 to 1.0 message: str timestamp: datetime details: Optional[Dict] = None @dataclass class BuildStatus: """Extended build status with progress tracking""" build_id: str status: str created_at: datetime updated_at: datetime blueprint: str target: str architecture: str progress: List[BuildProgress] logs: List[str] metadata: Optional[Dict] = None class StatusMonitor: """Monitors build status and progress""" def __init__(self, composer_client, poll_interval: int = 30): self.client = composer_client self.poll_interval = poll_interval self.monitored_builds: Dict[str, BuildStatus] = {} self.status_callbacks: List[Callable[[BuildStatus], None]] = [] self.monitoring_thread: Optional[threading.Thread] = None self.stop_monitoring = False def add_status_callback(self, callback: Callable[[BuildStatus], None]): """Add a callback for status updates""" self.status_callbacks.append(callback) def start_monitoring(self, build_id: str): """Start monitoring a specific build""" if build_id not in self.monitored_builds: # Get initial status try: status_data = self.client.get_compose_status(build_id) self.monitored_builds[build_id] = self._convert_to_build_status(status_data) except Exception as e: print(f"Failed to get initial status for {build_id}: {e}") return False # Start monitoring thread if not already running if not self.monitoring_thread or not self.monitoring_thread.is_alive(): self.stop_monitoring = False self.monitoring_thread = threading.Thread(target=self._monitoring_loop) self.monitoring_thread.daemon = True self.monitoring_thread.start() return True def stop_monitoring_build(self, build_id: str): """Stop monitoring a specific build""" if build_id in self.monitored_builds: del self.monitored_builds[build_id] def stop_all_monitoring(self): """Stop all monitoring""" self.stop_monitoring = True if self.monitoring_thread and self.monitoring_thread.is_alive(): self.monitoring_thread.join(timeout=5) def _monitoring_loop(self): """Main monitoring loop""" while not self.stop_monitoring: try: for build_id in list(self.monitored_builds.keys()): self._update_build_status(build_id) time.sleep(self.poll_interval) except Exception as e: print(f"Monitoring loop error: {e}") time.sleep(self.poll_interval) def _update_build_status(self, build_id: str): """Update status for a specific build""" try: status_data = self.client.get_compose_status(build_id) new_status = self._convert_to_build_status(status_data) old_status = self.monitored_builds.get(build_id) # Check if status changed if old_status and old_status.status != new_status.status: self._notify_status_change(new_status) # Update stored status self.monitored_builds[build_id] = new_status except Exception as e: print(f"Failed to update status for {build_id}: {e}") def _convert_to_build_status(self, status_data) -> BuildStatus: """Convert composer status data to our BuildStatus format""" return BuildStatus( build_id=status_data.get('id', ''), status=status_data.get('status', 'unknown'), created_at=datetime.fromisoformat(status_data.get('created_at', datetime.now().isoformat())), updated_at=datetime.now(), blueprint=status_data.get('blueprint', ''), target=status_data.get('image_type', ''), architecture=status_data.get('arch', ''), progress=self._parse_progress(status_data.get('progress', {})), logs=status_data.get('logs', []), metadata=status_data.get('metadata', {}) ) def _parse_progress(self, progress_data: Dict) -> List[BuildProgress]: """Parse progress data into BuildProgress objects""" progress_list = [] if isinstance(progress_data, dict): for stage, data in progress_data.items(): if isinstance(data, dict): progress = BuildProgress( stage=stage, progress=data.get('progress', 0.0), message=data.get('message', ''), timestamp=datetime.now(), details=data ) progress_list.append(progress) return progress_list def _notify_status_change(self, build_status: BuildStatus): """Notify all callbacks of status change""" for callback in self.status_callbacks: try: callback(build_status) except Exception as e: print(f"Callback error: {e}") def get_build_status(self, build_id: str) -> Optional[BuildStatus]: """Get current status of a monitored build""" return self.monitored_builds.get(build_id) def get_all_statuses(self) -> List[BuildStatus]: """Get status of all monitored builds""" return list(self.monitored_builds.values()) def get_builds_by_status(self, status: str) -> List[BuildStatus]: """Get all builds with a specific status""" return [build for build in self.monitored_builds.values() if build.status == status] class StatusNotifier: """Handles status notifications and alerts""" def __init__(self): self.notification_handlers: Dict[str, Callable] = {} self.notification_history: List[Dict] = [] def add_notification_handler(self, notification_type: str, handler: Callable): """Add a handler for a specific notification type""" self.notification_handlers[notification_type] = handler def notify(self, notification_type: str, message: str, data: Optional[Dict] = None): """Send a notification""" notification = { 'type': notification_type, 'message': message, 'data': data, 'timestamp': datetime.now().isoformat() } # Store in history self.notification_history.append(notification) # Send to handler if exists if notification_type in self.notification_handlers: try: self.notification_handlers[notification_type](notification) except Exception as e: print(f"Notification handler error: {e}") def get_notification_history(self, limit: Optional[int] = None) -> List[Dict]: """Get notification history""" if limit: return self.notification_history[-limit:] return self.notification_history class ConsoleStatusDisplay: """Console-based status display""" def __init__(self): self.last_display = {} def display_build_status(self, build_status: BuildStatus): """Display build status in console""" status_id = f"{build_status.build_id}:{build_status.status}" if status_id != self.last_display.get(build_status.build_id): print(f"\n=== Build Status Update ===") print(f"Build ID: {build_status.build_id}") print(f"Status: {build_status.status}") print(f"Blueprint: {build_status.blueprint}") print(f"Target: {build_status.target}") print(f"Architecture: {build_status.architecture}") print(f"Created: {build_status.created_at}") print(f"Updated: {build_status.updated_at}") if build_status.progress: print(f"Progress:") for prog in build_status.progress: print(f" {prog.stage}: {prog.progress:.1%} - {prog.message}") if build_status.logs: print(f"Recent Logs:") for log in build_status.logs[-3:]: # Show last 3 logs print(f" {log}") print("=" * 30) self.last_display[build_status.build_id] = status_id def main(): """Example usage of status monitoring""" # This would be used with an actual composer client print("Status Monitor Example") print("This module provides status monitoring for composer builds") if __name__ == '__main__': main()