559 lines
22 KiB
Python
Executable file
559 lines
22 KiB
Python
Executable file
#!/usr/bin/python3
|
|
"""
|
|
Debian Forge Build Orchestrator
|
|
|
|
Enhanced build queue management and OSBuild pipeline execution for Debian atomic builds.
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import json
|
|
import time
|
|
import threading
|
|
import subprocess
|
|
import psutil
|
|
import logging
|
|
from datetime import datetime
|
|
from typing import Dict, List, Optional, Any
|
|
from dataclasses import dataclass, asdict
|
|
from enum import Enum
|
|
|
|
|
|
class BuildStatus(Enum):
|
|
PENDING = "pending"
|
|
RUNNING = "running"
|
|
COMPLETED = "completed"
|
|
FAILED = "failed"
|
|
CANCELLED = "cancelled"
|
|
QUEUED = "queued"
|
|
PREPARING = "preparing"
|
|
BUILDING = "building"
|
|
FINALIZING = "finalizing"
|
|
|
|
|
|
@dataclass
|
|
class BuildRequest:
|
|
id: str
|
|
manifest_path: str
|
|
priority: int
|
|
status: BuildStatus
|
|
submitted_at: datetime
|
|
started_at: Optional[datetime] = None
|
|
completed_at: Optional[datetime] = None
|
|
error_message: Optional[str] = None
|
|
output_dir: Optional[str] = None
|
|
metadata: Optional[Dict[str, Any]] = None
|
|
resource_requirements: Optional[Dict[str, Any]] = None
|
|
logs: List[str] = None
|
|
progress: float = 0.0 # 0.0 to 1.0
|
|
|
|
def __post_init__(self):
|
|
if self.logs is None:
|
|
self.logs = []
|
|
|
|
def add_log(self, message: str):
|
|
"""Add a log message with timestamp"""
|
|
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
log_entry = f"[{timestamp}] {message}"
|
|
self.logs.append(log_entry)
|
|
|
|
def update_progress(self, progress: float):
|
|
"""Update build progress (0.0 to 1.0)"""
|
|
self.progress = max(0.0, min(1.0, progress))
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
"""Convert to dictionary for serialization"""
|
|
data = asdict(self)
|
|
data['status'] = self.status.value
|
|
data['submitted_at'] = self.submitted_at.isoformat()
|
|
if self.started_at:
|
|
data['started_at'] = self.started_at.isoformat()
|
|
if self.completed_at:
|
|
data['completed_at'] = self.completed_at.isoformat()
|
|
return data
|
|
|
|
|
|
class BuildStateMachine:
|
|
"""State machine for build lifecycle management"""
|
|
|
|
def __init__(self):
|
|
self.transitions = {
|
|
BuildStatus.PENDING: [BuildStatus.QUEUED, BuildStatus.CANCELLED],
|
|
BuildStatus.QUEUED: [BuildStatus.PREPARING, BuildStatus.CANCELLED],
|
|
BuildStatus.PREPARING: [BuildStatus.BUILDING, BuildStatus.FAILED],
|
|
BuildStatus.BUILDING: [BuildStatus.FINALIZING, BuildStatus.FAILED],
|
|
BuildStatus.FINALIZING: [BuildStatus.COMPLETED, BuildStatus.FAILED],
|
|
BuildStatus.RUNNING: [BuildStatus.COMPLETED, BuildStatus.FAILED, BuildStatus.CANCELLED],
|
|
BuildStatus.COMPLETED: [],
|
|
BuildStatus.FAILED: [],
|
|
BuildStatus.CANCELLED: []
|
|
}
|
|
|
|
def can_transition(self, from_status: BuildStatus, to_status: BuildStatus) -> bool:
|
|
"""Check if a status transition is valid"""
|
|
return to_status in self.transitions.get(from_status, [])
|
|
|
|
def get_valid_transitions(self, current_status: BuildStatus) -> List[BuildStatus]:
|
|
"""Get all valid transitions from current status"""
|
|
return self.transitions.get(current_status, [])
|
|
|
|
|
|
class BuildLogger:
|
|
"""Build logging and monitoring system"""
|
|
|
|
def __init__(self, log_dir: str = "build-logs"):
|
|
self.log_dir = log_dir
|
|
os.makedirs(log_dir, exist_ok=True)
|
|
|
|
# Set up logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
)
|
|
self.logger = logging.getLogger('debian-forge-builds')
|
|
|
|
def log_build_event(self, build_id: str, event: str, details: Optional[str] = None):
|
|
"""Log a build event"""
|
|
message = f"Build {build_id}: {event}"
|
|
if details:
|
|
message += f" - {details}"
|
|
|
|
self.logger.info(message)
|
|
|
|
# Write to build-specific log file
|
|
log_file = os.path.join(self.log_dir, f"{build_id}.log")
|
|
with open(log_file, 'a') as f:
|
|
timestamp = datetime.now().isoformat()
|
|
f.write(f"{timestamp} - {message}\n")
|
|
|
|
def get_build_logs(self, build_id: str) -> List[str]:
|
|
"""Get logs for a specific build"""
|
|
log_file = os.path.join(self.log_dir, f"{build_id}.log")
|
|
if os.path.exists(log_file):
|
|
with open(log_file, 'r') as f:
|
|
return f.readlines()
|
|
return []
|
|
|
|
def stream_build_logs(self, build_id: str):
|
|
"""Stream logs for a build in real-time"""
|
|
log_file = os.path.join(self.log_dir, f"{build_id}.log")
|
|
if os.path.exists(log_file):
|
|
with open(log_file, 'r') as f:
|
|
for line in f:
|
|
yield line.strip()
|
|
|
|
|
|
class ResourceManager:
|
|
"""Manage system resources for builds"""
|
|
|
|
def __init__(self):
|
|
self.max_cpu_percent = 80 # Maximum CPU usage per build
|
|
self.max_memory_gb = 4 # Maximum memory per build (GB)
|
|
self.max_storage_gb = 10 # Maximum storage per build (GB)
|
|
self.reserved_cpu_percent = 20 # Reserved CPU for system
|
|
self.reserved_memory_gb = 2 # Reserved memory for system
|
|
self.allocated_resources: Dict[str, Dict[str, Any]] = {}
|
|
|
|
def get_available_resources(self) -> Dict[str, Any]:
|
|
"""Get current available system resources"""
|
|
cpu_percent = psutil.cpu_percent(interval=1)
|
|
memory = psutil.virtual_memory()
|
|
disk = psutil.disk_usage('/')
|
|
|
|
# Calculate allocated resources
|
|
allocated_cpu = sum(req.get('cpu_percent', 0) for req in self.allocated_resources.values())
|
|
allocated_memory = sum(req.get('memory_gb', 0) for req in self.allocated_resources.values())
|
|
allocated_storage = sum(req.get('storage_gb', 0) for req in self.allocated_resources.values())
|
|
|
|
return {
|
|
"cpu_percent": max(0, 100 - cpu_percent - allocated_cpu),
|
|
"memory_gb": max(0, memory.available / (1024**3) - allocated_memory),
|
|
"storage_gb": max(0, disk.free / (1024**3) - allocated_storage)
|
|
}
|
|
|
|
def can_allocate_resources(self, requirements: Dict[str, Any]) -> bool:
|
|
"""Check if resources can be allocated for a build"""
|
|
available = self.get_available_resources()
|
|
|
|
cpu_needed = requirements.get("cpu_percent", self.max_cpu_percent)
|
|
memory_needed = requirements.get("memory_gb", self.max_memory_gb)
|
|
storage_needed = requirements.get("storage_gb", self.max_storage_gb)
|
|
|
|
return (available["cpu_percent"] >= cpu_needed and
|
|
available["memory_gb"] >= memory_needed and
|
|
available["storage_gb"] >= storage_needed)
|
|
|
|
def allocate_resources(self, build_id: str, requirements: Dict[str, Any]) -> bool:
|
|
"""Allocate resources for a build"""
|
|
if self.can_allocate_resources(requirements):
|
|
self.allocated_resources[build_id] = requirements.copy()
|
|
return True
|
|
return False
|
|
|
|
def release_resources(self, build_id: str):
|
|
"""Release resources after build completion"""
|
|
if build_id in self.allocated_resources:
|
|
del self.allocated_resources[build_id]
|
|
|
|
|
|
class BuildQueue:
|
|
"""Simple build queue with priority-based scheduling"""
|
|
|
|
def __init__(self):
|
|
self.queue: List[BuildRequest] = []
|
|
self.running_builds: Dict[str, BuildRequest] = {}
|
|
self.completed_builds: Dict[str, BuildRequest] = {}
|
|
self.lock = threading.Lock()
|
|
self.next_id = 1
|
|
self.resource_manager = ResourceManager()
|
|
self.state_machine = BuildStateMachine()
|
|
self.logger = BuildLogger()
|
|
|
|
def submit_build(self, manifest_path: str, priority: int = 5,
|
|
metadata: Optional[Dict[str, Any]] = None,
|
|
resource_requirements: Optional[Dict[str, Any]] = None) -> str:
|
|
"""Submit a new build request"""
|
|
with self.lock:
|
|
build_id = f"build-{self.next_id:06d}"
|
|
self.next_id += 1
|
|
|
|
request = BuildRequest(
|
|
id=build_id,
|
|
manifest_path=manifest_path,
|
|
priority=priority,
|
|
status=BuildStatus.PENDING,
|
|
submitted_at=datetime.now(),
|
|
metadata=metadata or {},
|
|
resource_requirements=resource_requirements or {}
|
|
)
|
|
|
|
self.queue.append(request)
|
|
# Sort by priority (higher priority first)
|
|
self.queue.sort(key=lambda x: x.priority, reverse=True)
|
|
|
|
# Log submission
|
|
self.logger.log_build_event(build_id, "Build submitted", f"Priority: {priority}")
|
|
request.add_log(f"Build submitted with priority {priority}")
|
|
|
|
print(f"Submitted build {build_id} with priority {priority}")
|
|
return build_id
|
|
|
|
def get_next_build(self) -> Optional[BuildRequest]:
|
|
"""Get the next build request from the queue that can be allocated resources"""
|
|
with self.lock:
|
|
if not self.queue:
|
|
return None
|
|
|
|
# Find a build that can have resources allocated
|
|
for i, request in enumerate(self.queue):
|
|
if self.resource_manager.can_allocate_resources(request.resource_requirements):
|
|
# Remove from queue and mark as running
|
|
request = self.queue.pop(i)
|
|
|
|
# Transition to QUEUED status
|
|
if self.state_machine.can_transition(request.status, BuildStatus.QUEUED):
|
|
request.status = BuildStatus.QUEUED
|
|
request.add_log("Build queued for execution")
|
|
|
|
# Transition to PREPARING status
|
|
if self.state_machine.can_transition(request.status, BuildStatus.PREPARING):
|
|
request.status = BuildStatus.PREPARING
|
|
request.add_log("Build preparation started")
|
|
|
|
request.started_at = datetime.now()
|
|
self.running_builds[request.id] = request
|
|
|
|
# Allocate resources
|
|
self.resource_manager.allocate_resources(request.id, request.resource_requirements)
|
|
|
|
# Log status change
|
|
self.logger.log_build_event(request.id, "Build started", "Resources allocated")
|
|
|
|
return request
|
|
|
|
return None
|
|
|
|
def update_build_status(self, build_id: str, new_status: BuildStatus,
|
|
progress: Optional[float] = None, message: Optional[str] = None):
|
|
"""Update build status with validation"""
|
|
with self.lock:
|
|
if build_id in self.running_builds:
|
|
request = self.running_builds[build_id]
|
|
|
|
if self.state_machine.can_transition(request.status, new_status):
|
|
old_status = request.status
|
|
request.status = new_status
|
|
|
|
if progress is not None:
|
|
request.update_progress(progress)
|
|
|
|
if message:
|
|
request.add_log(message)
|
|
|
|
# Log status change
|
|
self.logger.log_build_event(build_id, f"Status changed: {old_status.value} -> {new_status.value}", message)
|
|
|
|
# Handle completion
|
|
if new_status in [BuildStatus.COMPLETED, BuildStatus.FAILED, BuildStatus.CANCELLED]:
|
|
self._finalize_build(build_id, new_status)
|
|
else:
|
|
print(f"Invalid status transition: {request.status.value} -> {new_status.value}")
|
|
|
|
def _finalize_build(self, build_id: str, final_status: BuildStatus):
|
|
"""Finalize a completed build"""
|
|
if build_id in self.running_builds:
|
|
request = self.running_builds.pop(build_id)
|
|
request.completed_at = datetime.now()
|
|
request.update_progress(1.0 if final_status == BuildStatus.COMPLETED else 0.0)
|
|
|
|
# Release resources
|
|
self.resource_manager.release_resources(build_id)
|
|
|
|
self.completed_builds[build_id] = request
|
|
|
|
# Log completion
|
|
self.logger.log_build_event(build_id, "Build completed", f"Final status: {final_status.value}")
|
|
print(f"Build {build_id} completed with status: {final_status.value}")
|
|
|
|
def mark_completed(self, build_id: str, output_dir: str, success: bool = True, error_message: Optional[str] = None):
|
|
"""Mark a build as completed"""
|
|
if success:
|
|
self.update_build_status(build_id, BuildStatus.COMPLETED, 1.0, "Build completed successfully")
|
|
else:
|
|
self.update_build_status(build_id, BuildStatus.FAILED, 0.0, f"Build failed: {error_message}")
|
|
|
|
def cancel_build(self, build_id: str) -> bool:
|
|
"""Cancel a pending or running build"""
|
|
with self.lock:
|
|
# Check if build is in queue
|
|
for i, request in enumerate(self.queue):
|
|
if request.id == build_id:
|
|
request = self.queue.pop(i)
|
|
request.status = BuildStatus.CANCELLED
|
|
request.add_log("Build cancelled by user")
|
|
self.completed_builds[build_id] = request
|
|
self.logger.log_build_event(build_id, "Build cancelled", "User requested cancellation")
|
|
return True
|
|
|
|
# Check if build is running
|
|
if build_id in self.running_builds:
|
|
request = self.running_builds[build_id]
|
|
if self.state_machine.can_transition(request.status, BuildStatus.CANCELLED):
|
|
self.update_build_status(build_id, BuildStatus.CANCELLED, 0.0, "Build cancelled by user")
|
|
return True
|
|
|
|
return False
|
|
|
|
def get_status(self, build_id: str) -> Optional[BuildRequest]:
|
|
"""Get the status of a specific build"""
|
|
with self.lock:
|
|
# Check all queues
|
|
for request in self.queue:
|
|
if request.id == build_id:
|
|
return request
|
|
|
|
if build_id in self.running_builds:
|
|
return self.running_builds[build_id]
|
|
|
|
if build_id in self.completed_builds:
|
|
return self.completed_builds[build_id]
|
|
|
|
return None
|
|
|
|
def list_builds(self) -> Dict[str, List[BuildRequest]]:
|
|
"""List all builds by status"""
|
|
with self.lock:
|
|
return {
|
|
"pending": self.queue.copy(),
|
|
"running": list(self.running_builds.values()),
|
|
"completed": list(self.completed_builds.values())
|
|
}
|
|
|
|
def get_resource_status(self) -> Dict[str, Any]:
|
|
"""Get current resource status"""
|
|
available = self.resource_manager.get_available_resources()
|
|
running_count = len(self.running_builds)
|
|
|
|
return {
|
|
"available_resources": available,
|
|
"running_builds": running_count,
|
|
"queue_length": len(self.queue),
|
|
"allocated_resources": len(self.resource_manager.allocated_resources)
|
|
}
|
|
|
|
|
|
class OSBuildExecutor:
|
|
"""Execute OSBuild pipelines"""
|
|
|
|
def __init__(self, osbuild_path: str = "python3 -m osbuild"):
|
|
self.osbuild_path = osbuild_path
|
|
|
|
def execute_pipeline(self, manifest_path: str, output_dir: str) -> tuple[bool, Optional[str]]:
|
|
"""Execute an OSBuild pipeline"""
|
|
|
|
# Create output directory
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
|
|
# Run OSBuild
|
|
cmd = f"{self.osbuild_path} --libdir . --output-dir {output_dir} {manifest_path}"
|
|
|
|
print(f"Executing OSBuild: {cmd}")
|
|
|
|
try:
|
|
result = subprocess.run(
|
|
cmd.split(),
|
|
capture_output=True,
|
|
text=True,
|
|
cwd=os.getcwd()
|
|
)
|
|
|
|
if result.returncode == 0:
|
|
print(f"OSBuild pipeline completed successfully")
|
|
return True, None
|
|
else:
|
|
error_msg = f"OSBuild failed with return code {result.returncode}"
|
|
if result.stderr:
|
|
error_msg += f"\nStderr: {result.stderr}"
|
|
return False, error_msg
|
|
|
|
except Exception as e:
|
|
error_msg = f"Failed to execute OSBuild: {str(e)}"
|
|
return False, error_msg
|
|
|
|
|
|
class BuildOrchestrator:
|
|
"""Main build orchestration system"""
|
|
|
|
def __init__(self, osbuild_path: str = "python3 -m osbuild"):
|
|
self.queue = BuildQueue()
|
|
self.executor = OSBuildExecutor(osbuild_path)
|
|
self.running = False
|
|
self.worker_thread = None
|
|
|
|
def start(self):
|
|
"""Start the build orchestrator"""
|
|
if self.running:
|
|
return
|
|
|
|
self.running = True
|
|
self.worker_thread = threading.Thread(target=self._worker_loop, daemon=True)
|
|
self.worker_thread.start()
|
|
print("Build orchestrator started")
|
|
|
|
def stop(self):
|
|
"""Stop the build orchestrator"""
|
|
self.running = False
|
|
if self.worker_thread:
|
|
self.worker_thread.join()
|
|
print("Build orchestrator stopped")
|
|
|
|
def _worker_loop(self):
|
|
"""Main worker loop for processing builds"""
|
|
while self.running:
|
|
# Get next build
|
|
request = self.queue.get_next_build()
|
|
if not request:
|
|
time.sleep(1)
|
|
continue
|
|
|
|
print(f"Processing build {request.id}")
|
|
|
|
# Update status to BUILDING
|
|
self.queue.update_build_status(request.id, BuildStatus.BUILDING, 0.1, "OSBuild pipeline started")
|
|
|
|
# Create output directory
|
|
output_dir = f"builds/{request.id}"
|
|
|
|
# Execute build
|
|
success, error_message = self.executor.execute_pipeline(
|
|
request.manifest_path, output_dir
|
|
)
|
|
|
|
# Update progress and finalize
|
|
if success:
|
|
self.queue.update_build_status(request.id, BuildStatus.FINALIZING, 0.9, "OSBuild pipeline completed")
|
|
self.queue.mark_completed(request.id, output_dir, True)
|
|
else:
|
|
self.queue.mark_completed(request.id, output_dir, False, error_message)
|
|
|
|
def submit_build(self, manifest_path: str, priority: int = 5,
|
|
metadata: Optional[Dict[str, Any]] = None,
|
|
resource_requirements: Optional[Dict[str, Any]] = None) -> str:
|
|
"""Submit a new build request"""
|
|
return self.queue.submit_build(manifest_path, priority, metadata, resource_requirements)
|
|
|
|
def get_build_status(self, build_id: str) -> Optional[BuildRequest]:
|
|
"""Get the status of a specific build"""
|
|
return self.queue.get_status(build_id)
|
|
|
|
def list_builds(self) -> Dict[str, List[BuildRequest]]:
|
|
"""List all builds"""
|
|
return self.queue.list_builds()
|
|
|
|
def get_resource_status(self) -> Dict[str, Any]:
|
|
"""Get current resource status"""
|
|
return self.queue.get_resource_status()
|
|
|
|
def cancel_build(self, build_id: str) -> bool:
|
|
"""Cancel a build"""
|
|
return self.queue.cancel_build(build_id)
|
|
|
|
def get_build_logs(self, build_id: str) -> List[str]:
|
|
"""Get logs for a specific build"""
|
|
return self.queue.logger.get_build_logs(build_id)
|
|
|
|
|
|
def main():
|
|
"""Main function for command-line usage"""
|
|
if len(sys.argv) < 2:
|
|
print("Usage: python build_orchestrator.py <manifest_path> [priority] [cpu_percent] [memory_gb] [storage_gb]")
|
|
sys.exit(1)
|
|
|
|
manifest_path = sys.argv[1]
|
|
priority = int(sys.argv[2]) if len(sys.argv) > 2 else 5
|
|
cpu_percent = int(sys.argv[3]) if len(sys.argv) > 3 else 50
|
|
memory_gb = int(sys.argv[4]) if len(sys.argv) > 4 else 2
|
|
storage_gb = int(sys.argv[5]) if len(sys.argv) > 5 else 5
|
|
|
|
# Create orchestrator
|
|
orchestrator = BuildOrchestrator()
|
|
|
|
# Submit build with resource requirements
|
|
resource_reqs = {
|
|
"cpu_percent": cpu_percent,
|
|
"memory_gb": memory_gb,
|
|
"storage_gb": storage_gb
|
|
}
|
|
|
|
build_id = orchestrator.submit_build(manifest_path, priority, resource_requirements=resource_reqs)
|
|
print(f"Submitted build {build_id} with resource requirements: {resource_reqs}")
|
|
|
|
# Start orchestrator
|
|
orchestrator.start()
|
|
|
|
try:
|
|
# Monitor build
|
|
while True:
|
|
status = orchestrator.get_build_status(build_id)
|
|
if status:
|
|
print(f"Build {build_id}: {status.status.value} (Progress: {status.progress:.1%})")
|
|
|
|
if status.status in [BuildStatus.COMPLETED, BuildStatus.FAILED, BuildStatus.CANCELLED]:
|
|
if status.status == BuildStatus.FAILED:
|
|
print(f"Build failed: {status.error_message}")
|
|
break
|
|
|
|
# Show resource status
|
|
resource_status = orchestrator.get_resource_status()
|
|
print(f"Resources: CPU {resource_status['available_resources']['cpu_percent']:.1f}% free, "
|
|
f"Memory {resource_status['available_resources']['memory_gb']:.1f}GB free")
|
|
|
|
time.sleep(5)
|
|
|
|
except KeyboardInterrupt:
|
|
print("\nStopping orchestrator...")
|
|
orchestrator.stop()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|