debian-forge/build_orchestrator.py
robojerk 48c31fa24f
Some checks are pending
Checks / Spelling (push) Waiting to run
Checks / Python Linters (push) Waiting to run
Checks / Shell Linters (push) Waiting to run
Checks / 📦 Packit config lint (push) Waiting to run
Checks / 🔍 Check for valid snapshot urls (push) Waiting to run
Checks / 🔍 Check JSON files for formatting consistency (push) Waiting to run
Generate / Documentation (push) Waiting to run
Generate / Test Data (push) Waiting to run
Tests / Unittest (push) Waiting to run
Tests / Assembler test (legacy) (push) Waiting to run
Tests / Smoke run: unittest as normal user on default runner (push) Waiting to run
Implement enhanced build orchestration and artifact management
- Add build status tracking with state machine
- Implement build logging and monitoring system
- Add build progress tracking and cancellation support
- Create artifact management system with SQLite database
- Fix stage file extensions for proper Python imports
- Enhance resource allocation with actual resource tracking
- Add comprehensive testing for all components
2025-08-22 18:45:17 -07:00

559 lines
22 KiB
Python
Executable file

#!/usr/bin/python3
"""
Debian Forge Build Orchestrator
Enhanced build queue management and OSBuild pipeline execution for Debian atomic builds.
"""
import os
import sys
import json
import time
import threading
import subprocess
import psutil
import logging
from datetime import datetime
from typing import Dict, List, Optional, Any
from dataclasses import dataclass, asdict
from enum import Enum
class BuildStatus(Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
QUEUED = "queued"
PREPARING = "preparing"
BUILDING = "building"
FINALIZING = "finalizing"
@dataclass
class BuildRequest:
id: str
manifest_path: str
priority: int
status: BuildStatus
submitted_at: datetime
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
error_message: Optional[str] = None
output_dir: Optional[str] = None
metadata: Optional[Dict[str, Any]] = None
resource_requirements: Optional[Dict[str, Any]] = None
logs: List[str] = None
progress: float = 0.0 # 0.0 to 1.0
def __post_init__(self):
if self.logs is None:
self.logs = []
def add_log(self, message: str):
"""Add a log message with timestamp"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
log_entry = f"[{timestamp}] {message}"
self.logs.append(log_entry)
def update_progress(self, progress: float):
"""Update build progress (0.0 to 1.0)"""
self.progress = max(0.0, min(1.0, progress))
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for serialization"""
data = asdict(self)
data['status'] = self.status.value
data['submitted_at'] = self.submitted_at.isoformat()
if self.started_at:
data['started_at'] = self.started_at.isoformat()
if self.completed_at:
data['completed_at'] = self.completed_at.isoformat()
return data
class BuildStateMachine:
"""State machine for build lifecycle management"""
def __init__(self):
self.transitions = {
BuildStatus.PENDING: [BuildStatus.QUEUED, BuildStatus.CANCELLED],
BuildStatus.QUEUED: [BuildStatus.PREPARING, BuildStatus.CANCELLED],
BuildStatus.PREPARING: [BuildStatus.BUILDING, BuildStatus.FAILED],
BuildStatus.BUILDING: [BuildStatus.FINALIZING, BuildStatus.FAILED],
BuildStatus.FINALIZING: [BuildStatus.COMPLETED, BuildStatus.FAILED],
BuildStatus.RUNNING: [BuildStatus.COMPLETED, BuildStatus.FAILED, BuildStatus.CANCELLED],
BuildStatus.COMPLETED: [],
BuildStatus.FAILED: [],
BuildStatus.CANCELLED: []
}
def can_transition(self, from_status: BuildStatus, to_status: BuildStatus) -> bool:
"""Check if a status transition is valid"""
return to_status in self.transitions.get(from_status, [])
def get_valid_transitions(self, current_status: BuildStatus) -> List[BuildStatus]:
"""Get all valid transitions from current status"""
return self.transitions.get(current_status, [])
class BuildLogger:
"""Build logging and monitoring system"""
def __init__(self, log_dir: str = "build-logs"):
self.log_dir = log_dir
os.makedirs(log_dir, exist_ok=True)
# Set up logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
self.logger = logging.getLogger('debian-forge-builds')
def log_build_event(self, build_id: str, event: str, details: Optional[str] = None):
"""Log a build event"""
message = f"Build {build_id}: {event}"
if details:
message += f" - {details}"
self.logger.info(message)
# Write to build-specific log file
log_file = os.path.join(self.log_dir, f"{build_id}.log")
with open(log_file, 'a') as f:
timestamp = datetime.now().isoformat()
f.write(f"{timestamp} - {message}\n")
def get_build_logs(self, build_id: str) -> List[str]:
"""Get logs for a specific build"""
log_file = os.path.join(self.log_dir, f"{build_id}.log")
if os.path.exists(log_file):
with open(log_file, 'r') as f:
return f.readlines()
return []
def stream_build_logs(self, build_id: str):
"""Stream logs for a build in real-time"""
log_file = os.path.join(self.log_dir, f"{build_id}.log")
if os.path.exists(log_file):
with open(log_file, 'r') as f:
for line in f:
yield line.strip()
class ResourceManager:
"""Manage system resources for builds"""
def __init__(self):
self.max_cpu_percent = 80 # Maximum CPU usage per build
self.max_memory_gb = 4 # Maximum memory per build (GB)
self.max_storage_gb = 10 # Maximum storage per build (GB)
self.reserved_cpu_percent = 20 # Reserved CPU for system
self.reserved_memory_gb = 2 # Reserved memory for system
self.allocated_resources: Dict[str, Dict[str, Any]] = {}
def get_available_resources(self) -> Dict[str, Any]:
"""Get current available system resources"""
cpu_percent = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory()
disk = psutil.disk_usage('/')
# Calculate allocated resources
allocated_cpu = sum(req.get('cpu_percent', 0) for req in self.allocated_resources.values())
allocated_memory = sum(req.get('memory_gb', 0) for req in self.allocated_resources.values())
allocated_storage = sum(req.get('storage_gb', 0) for req in self.allocated_resources.values())
return {
"cpu_percent": max(0, 100 - cpu_percent - allocated_cpu),
"memory_gb": max(0, memory.available / (1024**3) - allocated_memory),
"storage_gb": max(0, disk.free / (1024**3) - allocated_storage)
}
def can_allocate_resources(self, requirements: Dict[str, Any]) -> bool:
"""Check if resources can be allocated for a build"""
available = self.get_available_resources()
cpu_needed = requirements.get("cpu_percent", self.max_cpu_percent)
memory_needed = requirements.get("memory_gb", self.max_memory_gb)
storage_needed = requirements.get("storage_gb", self.max_storage_gb)
return (available["cpu_percent"] >= cpu_needed and
available["memory_gb"] >= memory_needed and
available["storage_gb"] >= storage_needed)
def allocate_resources(self, build_id: str, requirements: Dict[str, Any]) -> bool:
"""Allocate resources for a build"""
if self.can_allocate_resources(requirements):
self.allocated_resources[build_id] = requirements.copy()
return True
return False
def release_resources(self, build_id: str):
"""Release resources after build completion"""
if build_id in self.allocated_resources:
del self.allocated_resources[build_id]
class BuildQueue:
"""Simple build queue with priority-based scheduling"""
def __init__(self):
self.queue: List[BuildRequest] = []
self.running_builds: Dict[str, BuildRequest] = {}
self.completed_builds: Dict[str, BuildRequest] = {}
self.lock = threading.Lock()
self.next_id = 1
self.resource_manager = ResourceManager()
self.state_machine = BuildStateMachine()
self.logger = BuildLogger()
def submit_build(self, manifest_path: str, priority: int = 5,
metadata: Optional[Dict[str, Any]] = None,
resource_requirements: Optional[Dict[str, Any]] = None) -> str:
"""Submit a new build request"""
with self.lock:
build_id = f"build-{self.next_id:06d}"
self.next_id += 1
request = BuildRequest(
id=build_id,
manifest_path=manifest_path,
priority=priority,
status=BuildStatus.PENDING,
submitted_at=datetime.now(),
metadata=metadata or {},
resource_requirements=resource_requirements or {}
)
self.queue.append(request)
# Sort by priority (higher priority first)
self.queue.sort(key=lambda x: x.priority, reverse=True)
# Log submission
self.logger.log_build_event(build_id, "Build submitted", f"Priority: {priority}")
request.add_log(f"Build submitted with priority {priority}")
print(f"Submitted build {build_id} with priority {priority}")
return build_id
def get_next_build(self) -> Optional[BuildRequest]:
"""Get the next build request from the queue that can be allocated resources"""
with self.lock:
if not self.queue:
return None
# Find a build that can have resources allocated
for i, request in enumerate(self.queue):
if self.resource_manager.can_allocate_resources(request.resource_requirements):
# Remove from queue and mark as running
request = self.queue.pop(i)
# Transition to QUEUED status
if self.state_machine.can_transition(request.status, BuildStatus.QUEUED):
request.status = BuildStatus.QUEUED
request.add_log("Build queued for execution")
# Transition to PREPARING status
if self.state_machine.can_transition(request.status, BuildStatus.PREPARING):
request.status = BuildStatus.PREPARING
request.add_log("Build preparation started")
request.started_at = datetime.now()
self.running_builds[request.id] = request
# Allocate resources
self.resource_manager.allocate_resources(request.id, request.resource_requirements)
# Log status change
self.logger.log_build_event(request.id, "Build started", "Resources allocated")
return request
return None
def update_build_status(self, build_id: str, new_status: BuildStatus,
progress: Optional[float] = None, message: Optional[str] = None):
"""Update build status with validation"""
with self.lock:
if build_id in self.running_builds:
request = self.running_builds[build_id]
if self.state_machine.can_transition(request.status, new_status):
old_status = request.status
request.status = new_status
if progress is not None:
request.update_progress(progress)
if message:
request.add_log(message)
# Log status change
self.logger.log_build_event(build_id, f"Status changed: {old_status.value} -> {new_status.value}", message)
# Handle completion
if new_status in [BuildStatus.COMPLETED, BuildStatus.FAILED, BuildStatus.CANCELLED]:
self._finalize_build(build_id, new_status)
else:
print(f"Invalid status transition: {request.status.value} -> {new_status.value}")
def _finalize_build(self, build_id: str, final_status: BuildStatus):
"""Finalize a completed build"""
if build_id in self.running_builds:
request = self.running_builds.pop(build_id)
request.completed_at = datetime.now()
request.update_progress(1.0 if final_status == BuildStatus.COMPLETED else 0.0)
# Release resources
self.resource_manager.release_resources(build_id)
self.completed_builds[build_id] = request
# Log completion
self.logger.log_build_event(build_id, "Build completed", f"Final status: {final_status.value}")
print(f"Build {build_id} completed with status: {final_status.value}")
def mark_completed(self, build_id: str, output_dir: str, success: bool = True, error_message: Optional[str] = None):
"""Mark a build as completed"""
if success:
self.update_build_status(build_id, BuildStatus.COMPLETED, 1.0, "Build completed successfully")
else:
self.update_build_status(build_id, BuildStatus.FAILED, 0.0, f"Build failed: {error_message}")
def cancel_build(self, build_id: str) -> bool:
"""Cancel a pending or running build"""
with self.lock:
# Check if build is in queue
for i, request in enumerate(self.queue):
if request.id == build_id:
request = self.queue.pop(i)
request.status = BuildStatus.CANCELLED
request.add_log("Build cancelled by user")
self.completed_builds[build_id] = request
self.logger.log_build_event(build_id, "Build cancelled", "User requested cancellation")
return True
# Check if build is running
if build_id in self.running_builds:
request = self.running_builds[build_id]
if self.state_machine.can_transition(request.status, BuildStatus.CANCELLED):
self.update_build_status(build_id, BuildStatus.CANCELLED, 0.0, "Build cancelled by user")
return True
return False
def get_status(self, build_id: str) -> Optional[BuildRequest]:
"""Get the status of a specific build"""
with self.lock:
# Check all queues
for request in self.queue:
if request.id == build_id:
return request
if build_id in self.running_builds:
return self.running_builds[build_id]
if build_id in self.completed_builds:
return self.completed_builds[build_id]
return None
def list_builds(self) -> Dict[str, List[BuildRequest]]:
"""List all builds by status"""
with self.lock:
return {
"pending": self.queue.copy(),
"running": list(self.running_builds.values()),
"completed": list(self.completed_builds.values())
}
def get_resource_status(self) -> Dict[str, Any]:
"""Get current resource status"""
available = self.resource_manager.get_available_resources()
running_count = len(self.running_builds)
return {
"available_resources": available,
"running_builds": running_count,
"queue_length": len(self.queue),
"allocated_resources": len(self.resource_manager.allocated_resources)
}
class OSBuildExecutor:
"""Execute OSBuild pipelines"""
def __init__(self, osbuild_path: str = "python3 -m osbuild"):
self.osbuild_path = osbuild_path
def execute_pipeline(self, manifest_path: str, output_dir: str) -> tuple[bool, Optional[str]]:
"""Execute an OSBuild pipeline"""
# Create output directory
os.makedirs(output_dir, exist_ok=True)
# Run OSBuild
cmd = f"{self.osbuild_path} --libdir . --output-dir {output_dir} {manifest_path}"
print(f"Executing OSBuild: {cmd}")
try:
result = subprocess.run(
cmd.split(),
capture_output=True,
text=True,
cwd=os.getcwd()
)
if result.returncode == 0:
print(f"OSBuild pipeline completed successfully")
return True, None
else:
error_msg = f"OSBuild failed with return code {result.returncode}"
if result.stderr:
error_msg += f"\nStderr: {result.stderr}"
return False, error_msg
except Exception as e:
error_msg = f"Failed to execute OSBuild: {str(e)}"
return False, error_msg
class BuildOrchestrator:
"""Main build orchestration system"""
def __init__(self, osbuild_path: str = "python3 -m osbuild"):
self.queue = BuildQueue()
self.executor = OSBuildExecutor(osbuild_path)
self.running = False
self.worker_thread = None
def start(self):
"""Start the build orchestrator"""
if self.running:
return
self.running = True
self.worker_thread = threading.Thread(target=self._worker_loop, daemon=True)
self.worker_thread.start()
print("Build orchestrator started")
def stop(self):
"""Stop the build orchestrator"""
self.running = False
if self.worker_thread:
self.worker_thread.join()
print("Build orchestrator stopped")
def _worker_loop(self):
"""Main worker loop for processing builds"""
while self.running:
# Get next build
request = self.queue.get_next_build()
if not request:
time.sleep(1)
continue
print(f"Processing build {request.id}")
# Update status to BUILDING
self.queue.update_build_status(request.id, BuildStatus.BUILDING, 0.1, "OSBuild pipeline started")
# Create output directory
output_dir = f"builds/{request.id}"
# Execute build
success, error_message = self.executor.execute_pipeline(
request.manifest_path, output_dir
)
# Update progress and finalize
if success:
self.queue.update_build_status(request.id, BuildStatus.FINALIZING, 0.9, "OSBuild pipeline completed")
self.queue.mark_completed(request.id, output_dir, True)
else:
self.queue.mark_completed(request.id, output_dir, False, error_message)
def submit_build(self, manifest_path: str, priority: int = 5,
metadata: Optional[Dict[str, Any]] = None,
resource_requirements: Optional[Dict[str, Any]] = None) -> str:
"""Submit a new build request"""
return self.queue.submit_build(manifest_path, priority, metadata, resource_requirements)
def get_build_status(self, build_id: str) -> Optional[BuildRequest]:
"""Get the status of a specific build"""
return self.queue.get_status(build_id)
def list_builds(self) -> Dict[str, List[BuildRequest]]:
"""List all builds"""
return self.queue.list_builds()
def get_resource_status(self) -> Dict[str, Any]:
"""Get current resource status"""
return self.queue.get_resource_status()
def cancel_build(self, build_id: str) -> bool:
"""Cancel a build"""
return self.queue.cancel_build(build_id)
def get_build_logs(self, build_id: str) -> List[str]:
"""Get logs for a specific build"""
return self.queue.logger.get_build_logs(build_id)
def main():
"""Main function for command-line usage"""
if len(sys.argv) < 2:
print("Usage: python build_orchestrator.py <manifest_path> [priority] [cpu_percent] [memory_gb] [storage_gb]")
sys.exit(1)
manifest_path = sys.argv[1]
priority = int(sys.argv[2]) if len(sys.argv) > 2 else 5
cpu_percent = int(sys.argv[3]) if len(sys.argv) > 3 else 50
memory_gb = int(sys.argv[4]) if len(sys.argv) > 4 else 2
storage_gb = int(sys.argv[5]) if len(sys.argv) > 5 else 5
# Create orchestrator
orchestrator = BuildOrchestrator()
# Submit build with resource requirements
resource_reqs = {
"cpu_percent": cpu_percent,
"memory_gb": memory_gb,
"storage_gb": storage_gb
}
build_id = orchestrator.submit_build(manifest_path, priority, resource_requirements=resource_reqs)
print(f"Submitted build {build_id} with resource requirements: {resource_reqs}")
# Start orchestrator
orchestrator.start()
try:
# Monitor build
while True:
status = orchestrator.get_build_status(build_id)
if status:
print(f"Build {build_id}: {status.status.value} (Progress: {status.progress:.1%})")
if status.status in [BuildStatus.COMPLETED, BuildStatus.FAILED, BuildStatus.CANCELLED]:
if status.status == BuildStatus.FAILED:
print(f"Build failed: {status.error_message}")
break
# Show resource status
resource_status = orchestrator.get_resource_status()
print(f"Resources: CPU {resource_status['available_resources']['cpu_percent']:.1f}% free, "
f"Memory {resource_status['available_resources']['memory_gb']:.1f}GB free")
time.sleep(5)
except KeyboardInterrupt:
print("\nStopping orchestrator...")
orchestrator.stop()
if __name__ == "__main__":
main()