#!/usr/bin/python3 """ Debian Forge Build Orchestrator Basic build queue management and OSBuild pipeline execution for Debian atomic builds. """ import os import sys import json import time import threading import subprocess from datetime import datetime from typing import Dict, List, Optional, Any from dataclasses import dataclass, asdict from enum import Enum class BuildStatus(Enum): PENDING = "pending" RUNNING = "running" COMPLETED = "completed" FAILED = "failed" CANCELLED = "cancelled" @dataclass class BuildRequest: id: str manifest_path: str priority: int status: BuildStatus submitted_at: datetime started_at: Optional[datetime] = None completed_at: Optional[datetime] = None error_message: Optional[str] = None output_dir: Optional[str] = None metadata: Optional[Dict[str, Any]] = None class BuildQueue: """Simple build queue with priority-based scheduling""" def __init__(self): self.queue: List[BuildRequest] = [] self.running_builds: Dict[str, BuildRequest] = {} self.completed_builds: Dict[str, BuildRequest] = {} self.lock = threading.Lock() self.next_id = 1 def submit_build(self, manifest_path: str, priority: int = 5, metadata: Optional[Dict[str, Any]] = None) -> str: """Submit a new build request""" with self.lock: build_id = f"build-{self.next_id:06d}" self.next_id += 1 request = BuildRequest( id=build_id, manifest_path=manifest_path, priority=priority, status=BuildStatus.PENDING, submitted_at=datetime.now(), metadata=metadata or {} ) self.queue.append(request) # Sort by priority (higher priority first) self.queue.sort(key=lambda x: x.priority, reverse=True) print(f"Submitted build {build_id} with priority {priority}") return build_id def get_next_build(self) -> Optional[BuildRequest]: """Get the next build request from the queue""" with self.lock: if not self.queue: return None request = self.queue.pop(0) request.status = BuildStatus.RUNNING request.started_at = datetime.now() self.running_builds[request.id] = request return request def mark_completed(self, build_id: str, output_dir: str, success: bool = True, error_message: Optional[str] = None): """Mark a build as completed""" with self.lock: if build_id not in self.running_builds: return request = self.running_builds.pop(build_id) request.completed_at = datetime.now() request.output_dir = output_dir if success: request.status = BuildStatus.COMPLETED else: request.status = BuildStatus.FAILED request.error_message = error_message self.completed_builds[build_id] = request print(f"Build {build_id} completed with status: {request.status.value}") def get_status(self, build_id: str) -> Optional[BuildRequest]: """Get the status of a specific build""" with self.lock: # Check all queues for request in self.queue: if request.id == build_id: return request if build_id in self.running_builds: return self.running_builds[build_id] if build_id in self.completed_builds: return self.completed_builds[build_id] return None def list_builds(self) -> Dict[str, List[BuildRequest]]: """List all builds by status""" with self.lock: return { "pending": self.queue.copy(), "running": list(self.running_builds.values()), "completed": list(self.completed_builds.values()) } class OSBuildExecutor: """Execute OSBuild pipelines""" def __init__(self, osbuild_path: str = "python3 -m osbuild"): self.osbuild_path = osbuild_path def execute_pipeline(self, manifest_path: str, output_dir: str) -> tuple[bool, Optional[str]]: """Execute an OSBuild pipeline""" # Create output directory os.makedirs(output_dir, exist_ok=True) # Run OSBuild cmd = f"{self.osbuild_path} --libdir . --output-dir {output_dir} {manifest_path}" print(f"Executing OSBuild: {cmd}") try: result = subprocess.run( cmd.split(), capture_output=True, text=True, cwd=os.getcwd() ) if result.returncode == 0: print(f"OSBuild pipeline completed successfully") return True, None else: error_msg = f"OSBuild failed with return code {result.returncode}" if result.stderr: error_msg += f"\nStderr: {result.stderr}" return False, error_msg except Exception as e: error_msg = f"Failed to execute OSBuild: {str(e)}" return False, error_msg class BuildOrchestrator: """Main build orchestration system""" def __init__(self, osbuild_path: str = "python3 -m osbuild"): self.queue = BuildQueue() self.executor = OSBuildExecutor(osbuild_path) self.running = False self.worker_thread = None def start(self): """Start the build orchestrator""" if self.running: return self.running = True self.worker_thread = threading.Thread(target=self._worker_loop, daemon=True) self.worker_thread.start() print("Build orchestrator started") def stop(self): """Stop the build orchestrator""" self.running = False if self.worker_thread: self.worker_thread.join() print("Build orchestrator stopped") def _worker_loop(self): """Main worker loop for processing builds""" while self.running: # Get next build request = self.queue.get_next_build() if not request: time.sleep(1) continue print(f"Processing build {request.id}") # Create output directory output_dir = f"builds/{request.id}" # Execute build success, error_message = self.executor.execute_pipeline( request.manifest_path, output_dir ) # Mark build as completed self.queue.mark_completed( request.id, output_dir, success, error_message ) def submit_build(self, manifest_path: str, priority: int = 5, metadata: Optional[Dict[str, Any]] = None) -> str: """Submit a new build request""" return self.queue.submit_build(manifest_path, priority, metadata) def get_build_status(self, build_id: str) -> Optional[BuildRequest]: """Get the status of a specific build""" return self.queue.get_status(build_id) def list_builds(self) -> Dict[str, List[BuildRequest]]: """List all builds""" return self.queue.list_builds() def main(): """Main function for command-line usage""" if len(sys.argv) < 2: print("Usage: python build-orchestrator.py [priority]") sys.exit(1) manifest_path = sys.argv[1] priority = int(sys.argv[2]) if len(sys.argv) > 2 else 5 # Create orchestrator orchestrator = BuildOrchestrator() # Submit build build_id = orchestrator.submit_build(manifest_path, priority) print(f"Submitted build {build_id}") # Start orchestrator orchestrator.start() try: # Monitor build while True: status = orchestrator.get_build_status(build_id) if status: print(f"Build {build_id}: {status.status.value}") if status.status in [BuildStatus.COMPLETED, BuildStatus.FAILED]: if status.status == BuildStatus.FAILED: print(f"Build failed: {status.error_message}") break time.sleep(5) except KeyboardInterrupt: print("\nStopping orchestrator...") orchestrator.stop() if __name__ == "__main__": main()