diff --git a/build-logs/build-000001.log b/build-logs/build-000001.log index f2c0eae7..92cffb83 100644 --- a/build-logs/build-000001.log +++ b/build-logs/build-000001.log @@ -1,3 +1,6 @@ 2025-08-22T18:43:44.007228 - Build build-000001: Build submitted - Priority: 5 2025-08-22T18:43:44.008134 - Build build-000001: Build submitted - Priority: 5 2025-08-22T18:43:45.009838 - Build build-000001: Build submitted - Priority: 5 +2025-08-22T20:45:25.433439 - Build build-000001: Build submitted - Priority: 5 +2025-08-22T20:45:45.179487 - Build build-000001: Build submitted - Priority: 5 +2025-08-22T20:45:55.222544 - Build build-000001: Build submitted - Priority: 5 diff --git a/build_environment.py b/build_environment.py new file mode 100644 index 00000000..f96955fb --- /dev/null +++ b/build_environment.py @@ -0,0 +1,475 @@ +#!/usr/bin/python3 +""" +Debian Forge Build Environment Manager + +Manages isolated build environments, cleanup, and reuse policies for OSBuild builds. +""" + +import os +import sys +import time +import shutil +import tempfile +import subprocess +import threading +from datetime import datetime, timedelta +from typing import Dict, List, Optional, Any, Tuple +from pathlib import Path +from dataclasses import dataclass, asdict +from enum import Enum + + +class EnvironmentStatus(Enum): + CREATING = "creating" + READY = "ready" + IN_USE = "in_use" + CLEANING = "cleaning" + CLEANED = "cleaned" + FAILED = "failed" + + +@dataclass +class BuildEnvironment: + """Represents a build environment""" + id: str + base_path: str + status: EnvironmentStatus + created_at: datetime + last_used: Optional[datetime] = None + use_count: int = 0 + metadata: Optional[Dict[str, Any]] = None + resource_usage: Optional[Dict[str, Any]] = None + + def to_dict(self) -> Dict[str, Any]: + """Convert to dictionary for serialization""" + data = asdict(self) + data['status'] = self.status.value + data['created_at'] = self.created_at.isoformat() + if self.last_used: + data['last_used'] = self.last_used.isoformat() + return data + + +class EnvironmentIsolation: + """Handles build environment isolation""" + + def __init__(self, base_dir: str = "build-environments"): + self.base_dir = Path(base_dir) + self.base_dir.mkdir(exist_ok=True) + + def create_isolated_environment(self, env_id: str, base_image: Optional[str] = None) -> str: + """Create an isolated build environment""" + env_path = self.base_dir / env_id + env_path.mkdir(exist_ok=True) + + # Create isolation structure + isolation_dirs = [ + "rootfs", + "overlay", + "work", + "metadata" + ] + + for dir_name in isolation_dirs: + (env_path / dir_name).mkdir(exist_ok=True) + + # Create basic isolation files + self._create_isolation_files(env_path) + + # If base image provided, extract it + if base_image and os.path.exists(base_image): + self._extract_base_image(env_path, base_image) + + return str(env_path) + + def _create_isolation_files(self, env_path: Path): + """Create basic isolation configuration files""" + # Create environment configuration + env_config = env_path / "metadata" / "environment.conf" + env_config.parent.mkdir(exist_ok=True) + + with open(env_config, 'w') as f: + f.write(f"# Build environment configuration\n") + f.write(f"created_at: {datetime.now().isoformat()}\n") + f.write(f"isolation_level: strict\n") + f.write(f"base_path: {env_path}\n") + + # Create mount points file + mount_points = env_path / "metadata" / "mounts" + with open(mount_points, 'w') as f: + f.write("# Mount points for isolation\n") + f.write("/proc\n") + f.write("/sys\n") + f.write("/dev\n") + f.write("/tmp\n") + + def _extract_base_image(self, env_path: Path, base_image: str): + """Extract base image to environment""" + try: + # For now, just copy the base image + # In a real implementation, this would extract and mount + shutil.copy2(base_image, env_path / "rootfs" / "base-image") + except Exception as e: + print(f"Warning: Failed to extract base image: {e}") + + def cleanup_environment(self, env_path: str) -> bool: + """Clean up an environment completely""" + try: + env_path_obj = Path(env_path) + if env_path_obj.exists(): + shutil.rmtree(env_path_obj) + return True + return False + except Exception as e: + print(f"Error cleaning up environment {env_path}: {e}") + return False + + def verify_isolation(self, env_path: str) -> bool: + """Verify that environment isolation is working""" + env_path_obj = Path(env_path) + + # Check isolation structure + required_dirs = ["rootfs", "overlay", "work", "metadata"] + for dir_name in required_dirs: + if not (env_path_obj / dir_name).exists(): + return False + + # Check isolation files + required_files = ["metadata/environment.conf", "metadata/mounts"] + for file_name in required_files: + if not (env_path_obj / file_name).exists(): + return False + + return True + + +class EnvironmentCleanup: + """Handles automatic environment cleanup""" + + def __init__(self, cleanup_policy: Dict[str, Any] = None): + self.cleanup_policy = cleanup_policy or { + "max_age_days": 7, + "max_use_count": 10, + "cleanup_interval_hours": 24, + "keep_minimum": 2 + } + self.cleanup_thread = None + self.running = False + + def start_cleanup_scheduler(self): + """Start the cleanup scheduler thread""" + if self.cleanup_thread is None or not self.cleanup_thread.is_alive(): + self.running = True + self.cleanup_thread = threading.Thread(target=self._cleanup_loop, daemon=True) + self.cleanup_thread.start() + + def stop_cleanup_scheduler(self): + """Stop the cleanup scheduler""" + self.running = False + if self.cleanup_thread and self.cleanup_thread.is_alive(): + self.cleanup_thread.join() + + def _cleanup_loop(self): + """Main cleanup loop""" + while self.running: + try: + self._perform_cleanup() + # Sleep for cleanup interval + time.sleep(self.cleanup_policy["cleanup_interval_hours"] * 3600) + except Exception as e: + print(f"Cleanup error: {e}") + time.sleep(3600) # Sleep for 1 hour on error + + def _perform_cleanup(self): + """Perform actual cleanup operations""" + # This would be implemented to work with the environment manager + # For now, it's a placeholder + pass + + def should_cleanup_environment(self, env: BuildEnvironment) -> bool: + """Determine if an environment should be cleaned up""" + now = datetime.now() + + # Check age + age_days = (now - env.created_at).days + if age_days > self.cleanup_policy["max_age_days"]: + return True + + # Check use count + if env.use_count > self.cleanup_policy["max_use_count"]: + return True + + return False + + +class HostMonitoring: + """Monitors build host health and resource usage""" + + def __init__(self): + self.health_checks = [] + self.monitoring_interval = 30 # seconds + + def add_health_check(self, check_func): + """Add a health check function""" + self.health_checks.append(check_func) + + def get_host_health(self) -> Dict[str, Any]: + """Get current host health status""" + health_status = { + "timestamp": datetime.now().isoformat(), + "overall_status": "healthy", + "checks": {}, + "resource_usage": self._get_resource_usage() + } + + # Run health checks + for check_func in self.health_checks: + try: + check_name = check_func.__name__ + check_result = check_func() + health_status["checks"][check_name] = check_result + + if not check_result.get("healthy", True): + health_status["overall_status"] = "unhealthy" + except Exception as e: + health_status["checks"][check_func.__name__] = { + "healthy": False, + "error": str(e) + } + health_status["overall_status"] = "unhealthy" + + return health_status + + def _get_resource_usage(self) -> Dict[str, Any]: + """Get current resource usage""" + try: + import psutil + + cpu_percent = psutil.cpu_percent(interval=1) + memory = psutil.virtual_memory() + disk = psutil.disk_usage('/') + + return { + "cpu_percent": cpu_percent, + "memory_percent": memory.percent, + "memory_available_gb": memory.available / (1024**3), + "disk_percent": disk.percent, + "disk_free_gb": disk.free / (1024**3) + } + except ImportError: + return {"error": "psutil not available"} + + def check_disk_space(self) -> Dict[str, Any]: + """Check available disk space""" + try: + import psutil + disk = psutil.disk_usage('/') + free_gb = disk.free / (1024**3) + + return { + "healthy": free_gb > 5.0, # Need at least 5GB free + "free_gb": free_gb, + "threshold_gb": 5.0 + } + except ImportError: + return {"healthy": False, "error": "psutil not available"} + + def check_memory_usage(self) -> Dict[str, Any]: + """Check memory usage""" + try: + import psutil + memory = psutil.virtual_memory() + available_gb = memory.available / (1024**3) + + return { + "healthy": available_gb > 2.0, # Need at least 2GB free + "available_gb": available_gb, + "threshold_gb": 2.0 + } + except ImportError: + return {"healthy": False, "error": "psutil not available"} + + def check_cpu_usage(self) -> Dict[str, Any]: + """Check CPU usage""" + try: + import psutil + cpu_percent = psutil.cpu_percent(interval=1) + + return { + "healthy": cpu_percent < 90.0, # CPU should be less than 90% + "cpu_percent": cpu_percent, + "threshold_percent": 90.0 + } + except ImportError: + return {"healthy": False, "error": "psutil not available"} + + +class BuildEnvironmentManager: + """Main build environment management system""" + + def __init__(self, base_dir: str = "build-environments"): + self.isolation = EnvironmentIsolation(base_dir) + self.cleanup = EnvironmentCleanup() + self.monitoring = HostMonitoring() + self.environments: Dict[str, BuildEnvironment] = {} + self.lock = threading.Lock() + + # Add default health checks + self.monitoring.add_health_check(self.monitoring.check_disk_space) + self.monitoring.add_health_check(self.monitoring.check_memory_usage) + self.monitoring.add_health_check(self.monitoring.check_cpu_usage) + + # Start cleanup scheduler + self.cleanup.start_cleanup_scheduler() + + def create_environment(self, env_id: str, base_image: Optional[str] = None) -> str: + """Create a new build environment""" + with self.lock: + if env_id in self.environments: + raise ValueError(f"Environment {env_id} already exists") + + # Create environment + env_path = self.isolation.create_isolated_environment(env_id, base_image) + + # Create environment record + env = BuildEnvironment( + id=env_id, + base_path=env_path, + status=EnvironmentStatus.CREATING, + created_at=datetime.now() + ) + + self.environments[env_id] = env + + # Verify isolation + if self.isolation.verify_isolation(env_path): + env.status = EnvironmentStatus.READY + print(f"✅ Environment {env_id} created successfully") + else: + env.status = EnvironmentStatus.FAILED + print(f"❌ Environment {env_id} isolation verification failed") + + return env_path + + def get_environment(self, env_id: str) -> Optional[BuildEnvironment]: + """Get an environment by ID""" + return self.environments.get(env_id) + + def use_environment(self, env_id: str) -> bool: + """Mark environment as in use""" + with self.lock: + env = self.environments.get(env_id) + if env and env.status == EnvironmentStatus.READY: + env.status = EnvironmentStatus.IN_USE + env.last_used = datetime.now() + env.use_count += 1 + return True + return False + + def release_environment(self, env_id: str) -> bool: + """Release environment back to ready state""" + with self.lock: + env = self.environments.get(env_id) + if env and env.status == EnvironmentStatus.IN_USE: + env.status = EnvironmentStatus.READY + return True + return False + + def cleanup_environment(self, env_id: str) -> bool: + """Clean up a specific environment""" + with self.lock: + env = self.environments.get(env_id) + if env: + env.status = EnvironmentStatus.CLEANING + + # Clean up files + if self.isolation.cleanup_environment(env.base_path): + env.status = EnvironmentStatus.CLEANED + del self.environments[env_id] + print(f"✅ Environment {env_id} cleaned up successfully") + return True + else: + env.status = EnvironmentStatus.FAILED + print(f"❌ Failed to clean up environment {env_id}") + return False + return False + + def get_available_environments(self) -> List[BuildEnvironment]: + """Get list of available environments""" + with self.lock: + return [env for env in self.environments.values() + if env.status == EnvironmentStatus.READY] + + def get_host_health(self) -> Dict[str, Any]: + """Get current host health status""" + return self.monitoring.get_host_health() + + def cleanup_old_environments(self) -> int: + """Clean up old environments based on policy""" + cleaned_count = 0 + + with self.lock: + envs_to_cleanup = [] + for env in self.environments.values(): + if self.cleanup.should_cleanup_environment(env): + envs_to_cleanup.append(env.id) + + for env_id in envs_to_cleanup: + if self.cleanup_environment(env_id): + cleaned_count += 1 + + return cleaned_count + + def shutdown(self): + """Shutdown the environment manager""" + self.cleanup.stop_cleanup_scheduler() + + # Clean up all environments + with self.lock: + for env_id in list(self.environments.keys()): + self.cleanup_environment(env_id) + + +def main(): + """Example usage of the build environment manager""" + print("Debian Forge Build Environment Manager") + print("=" * 50) + + # Create environment manager + manager = BuildEnvironmentManager() + + try: + # Create test environment + print("Creating test environment...") + env_path = manager.create_environment("test-env-001") + + # Check host health + print("\nChecking host health...") + health = manager.get_host_health() + print(f"Overall status: {health['overall_status']}") + print(f"Resource usage: {health['resource_usage']}") + + # Use environment + print("\nUsing environment...") + if manager.use_environment("test-env-001"): + print("✅ Environment marked as in use") + + # Release environment + if manager.release_environment("test-env-001"): + print("✅ Environment released") + + # Get available environments + available = manager.get_available_environments() + print(f"\nAvailable environments: {len(available)}") + + # Clean up test environment + print("\nCleaning up test environment...") + if manager.cleanup_environment("test-env-001"): + print("✅ Test environment cleaned up") + + finally: + manager.shutdown() + + +if __name__ == "__main__": + main() diff --git a/osbuild_integration.py b/osbuild_integration.py new file mode 100644 index 00000000..3cb37d9b --- /dev/null +++ b/osbuild_integration.py @@ -0,0 +1,338 @@ +#!/usr/bin/python3 +""" +Debian Forge OSBuild Integration + +Integrates modified OSBuild with the build orchestration system. +""" + +import os +import sys +import json +import subprocess +import tempfile +from pathlib import Path +from typing import Dict, List, Optional, Any, Tuple +from build_orchestrator import BuildOrchestrator, BuildStatus +from build_environment import BuildEnvironmentManager +from artifact_manager import ArtifactManager + + +class OSBuildIntegration: + """Integrates OSBuild with Debian Forge build orchestration""" + + def __init__(self, osbuild_path: str = "python3 -m osbuild"): + self.osbuild_path = osbuild_path + self.orchestrator = BuildOrchestrator() + self.env_manager = BuildEnvironmentManager() + self.artifact_manager = ArtifactManager() + + def submit_osbuild_pipeline(self, manifest_path: str, priority: int = 5, + resource_requirements: Optional[Dict[str, Any]] = None, + environment_id: Optional[str] = None) -> str: + """Submit an OSBuild pipeline for execution""" + + # Validate manifest + if not self._validate_manifest(manifest_path): + raise ValueError(f"Invalid manifest: {manifest_path}") + + # Create build environment if specified + if environment_id: + if not self.env_manager.get_environment(environment_id): + env_path = self.env_manager.create_environment(environment_id) + print(f"Created build environment: {environment_id}") + + # Submit build to orchestrator + build_id = self.orchestrator.submit_build( + manifest_path, + priority=priority, + resource_requirements=resource_requirements or {}, + metadata={ + "type": "osbuild_pipeline", + "environment_id": environment_id, + "manifest_path": manifest_path + } + ) + + print(f"Submitted OSBuild pipeline: {build_id}") + return build_id + + def execute_pipeline(self, manifest_path: str, output_dir: str, + environment_id: Optional[str] = None) -> Tuple[bool, Optional[str]]: + """Execute an OSBuild pipeline directly""" + + # Create output directory + os.makedirs(output_dir, exist_ok=True) + + # Set up environment if specified + env_vars = {} + if environment_id: + env = self.env_manager.get_environment(environment_id) + if env: + self.env_manager.use_environment(environment_id) + env_vars["OSBUILD_ENV_PATH"] = env.base_path + try: + # Execute OSBuild + result = self._run_osbuild(manifest_path, output_dir, env_vars) + return result + finally: + self.env_manager.release_environment(environment_id) + else: + return False, f"Environment {environment_id} not found" + else: + # Execute without specific environment + return self._run_osbuild(manifest_path, output_dir, env_vars) + + def _run_osbuild(self, manifest_path: str, output_dir: str, env_vars: Dict[str, str]) -> Tuple[bool, Optional[str]]: + """Run OSBuild command""" + + # Build OSBuild command + cmd = [ + "python3", "-m", "osbuild", + "--libdir", ".", + "--output-dir", output_dir, + manifest_path + ] + + print(f"Executing OSBuild: {' '.join(cmd)}") + + try: + # Run OSBuild + result = subprocess.run( + cmd, + capture_output=True, + text=True, + cwd=os.getcwd(), + env={**os.environ, **env_vars} + ) + + if result.returncode == 0: + print("✅ OSBuild pipeline completed successfully") + return True, None + else: + error_msg = f"OSBuild failed with return code {result.returncode}" + if result.stderr: + error_msg += f"\nStderr: {result.stderr}" + return False, error_msg + + except Exception as e: + error_msg = f"Failed to execute OSBuild: {str(e)}" + return False, error_msg + + def _validate_manifest(self, manifest_path: str) -> bool: + """Validate OSBuild manifest""" + try: + with open(manifest_path, 'r') as f: + manifest = json.load(f) + + # Check basic structure + if "version" not in manifest: + print("❌ Manifest missing version") + return False + + if "pipelines" not in manifest: + print("❌ Manifest missing pipelines") + return False + + # Validate pipelines + for pipeline in manifest["pipelines"]: + if "name" not in pipeline: + print("❌ Pipeline missing name") + return False + + if "runner" not in pipeline: + print("❌ Pipeline missing runner") + return False + + if "stages" not in pipeline: + print("❌ Pipeline missing stages") + return False + + # Validate stages + for stage in pipeline["stages"]: + if "name" not in stage: + print("❌ Stage missing name") + return False + + print("✅ Manifest validation passed") + return True + + except FileNotFoundError: + print(f"❌ Manifest file not found: {manifest_path}") + return False + except json.JSONDecodeError as e: + print(f"❌ Invalid JSON in manifest: {e}") + return False + except Exception as e: + print(f"❌ Manifest validation error: {e}") + return False + + def get_pipeline_status(self, build_id: str) -> Optional[Dict[str, Any]]: + """Get pipeline execution status""" + build_status = self.orchestrator.get_build_status(build_id) + if not build_status: + return None + + # Get artifacts for this build + artifacts = self.artifact_manager.get_build_artifacts(build_id) + + # Get environment info if available + environment_info = None + if build_status.metadata and "environment_id" in build_status.metadata: + env_id = build_status.metadata["environment_id"] + env = self.env_manager.get_environment(env_id) + if env: + environment_info = env.to_dict() + + return { + "build_id": build_id, + "status": build_status.status.value, + "progress": build_status.progress, + "submitted_at": build_status.submitted_at.isoformat(), + "started_at": build_status.started_at.isoformat() if build_status.started_at else None, + "completed_at": build_status.completed_at.isoformat() if build_status.completed_at else None, + "error_message": build_status.error_message, + "artifacts": [a.to_dict() for a in artifacts], + "environment": environment_info, + "metadata": build_status.metadata + } + + def list_pipelines(self) -> Dict[str, List[Dict[str, Any]]]: + """List all pipeline builds""" + builds = self.orchestrator.list_builds() + + result = {} + for status, build_list in builds.items(): + result[status] = [] + for build in build_list: + if build.metadata and build.metadata.get("type") == "osbuild_pipeline": + result[status].append({ + "build_id": build.id, + "manifest_path": build.manifest_path, + "priority": build.priority, + "status": build.status.value, + "submitted_at": build.submitted_at.isoformat() + }) + + return result + + def cancel_pipeline(self, build_id: str) -> bool: + """Cancel a pipeline execution""" + return self.orchestrator.cancel_build(build_id) + + def get_pipeline_logs(self, build_id: str) -> List[str]: + """Get logs for a pipeline execution""" + return self.orchestrator.get_build_logs(build_id) + + +def create_test_debian_manifest() -> Dict[str, Any]: + """Create a test Debian manifest for integration testing""" + return { + "version": "2", + "pipelines": [ + { + "name": "debian-base-system", + "runner": "org.osbuild.linux", + "stages": [ + { + "name": "org.osbuild.mkdir", + "options": { + "paths": ["/tmp/debian-test"] + } + }, + { + "name": "org.osbuild.copy", + "options": { + "paths": [ + { + "from": "test-debian-manifest.json", + "to": "/tmp/debian-test/manifest.json" + } + ] + } + }, + { + "name": "org.osbuild.shell", + "options": { + "script": "echo 'Debian pipeline test completed' > /tmp/debian-test/status.txt" + } + } + ] + } + ] + } + + +def test_osbuild_integration(): + """Test OSBuild integration functionality""" + print("Testing OSBuild Integration...") + + integration = OSBuildIntegration() + + # Create test manifest + test_manifest = create_test_debian_manifest() + manifest_path = "test-osbuild-integration.json" + + with open(manifest_path, 'w') as f: + json.dump(test_manifest, f, indent=2) + + try: + # Test manifest validation + print("\n1. Testing manifest validation...") + if integration._validate_manifest(manifest_path): + print("✅ Manifest validation passed") + else: + print("❌ Manifest validation failed") + return False + + # Test pipeline submission + print("\n2. Testing pipeline submission...") + build_id = integration.submit_osbuild_pipeline( + manifest_path, + priority=5, + resource_requirements={"cpu_percent": 10, "memory_gb": 1, "storage_gb": 1} + ) + print(f"✅ Pipeline submitted: {build_id}") + + # Test pipeline status + print("\n3. Testing pipeline status...") + status = integration.get_pipeline_status(build_id) + if status: + print(f"✅ Pipeline status retrieved: {status['status']}") + else: + print("❌ Failed to get pipeline status") + return False + + # Test pipeline listing + print("\n4. Testing pipeline listing...") + pipelines = integration.list_pipelines() + if pipelines and "pending" in pipelines and len(pipelines["pending"]) > 0: + print(f"✅ Pipeline listing working: {len(pipelines['pending'])} pending") + else: + print("❌ Pipeline listing failed") + return False + + print("\n🎉 All OSBuild integration tests passed!") + return True + + except Exception as e: + print(f"❌ OSBuild integration test failed: {e}") + return False + finally: + # Cleanup + if os.path.exists(manifest_path): + os.remove(manifest_path) + + +def main(): + """Main function for OSBuild integration testing""" + print("Debian Forge OSBuild Integration") + print("=" * 40) + + if test_osbuild_integration(): + return 0 + else: + return 1 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/test-build-lifecycle.py b/test-build-lifecycle.py new file mode 100644 index 00000000..baf3fe69 --- /dev/null +++ b/test-build-lifecycle.py @@ -0,0 +1,394 @@ +#!/usr/bin/python3 +""" +Test script for complete build lifecycle in Debian Forge + +This script tests the entire build process from submission to completion, +including failure handling, retry mechanisms, and cleanup. +""" + +import sys +import time +import tempfile +import os +import json +from pathlib import Path +from build_orchestrator import BuildOrchestrator, BuildStatus +from artifact_manager import ArtifactManager + + +def test_build_submission_to_completion(): + """Test complete build lifecycle from submission to completion""" + print("Testing build submission to completion...") + + # Create orchestrator and artifact manager + orchestrator = BuildOrchestrator() + artifact_manager = ArtifactManager() + + # Create a simple test manifest + test_manifest = create_test_manifest() + manifest_path = "test-lifecycle-manifest.json" + + with open(manifest_path, 'w') as f: + json.dump(test_manifest, f, indent=2) + + try: + # Submit build + build_id = orchestrator.submit_build( + manifest_path, + priority=5, + resource_requirements={"cpu_percent": 10, "memory_gb": 1, "storage_gb": 1} + ) + + print(f"Submitted build {build_id}") + + # Start orchestrator + orchestrator.start() + + # Monitor build progress + start_time = time.time() + max_wait_time = 30 # 30 seconds max + + while time.time() - start_time < max_wait_time: + status = orchestrator.get_build_status(build_id) + if status: + print(f"Build {build_id}: {status.status.value} (Progress: {status.progress:.1%})") + + # Check for completion + if status.status in [BuildStatus.COMPLETED, BuildStatus.FAILED]: + if status.status == BuildStatus.COMPLETED: + print(f"✅ Build {build_id} completed successfully") + + # Verify artifacts were created + artifacts = artifact_manager.get_build_artifacts(build_id) + if artifacts: + print(f"✅ Build artifacts created: {len(artifacts)} artifacts") + return True + else: + print("❌ No build artifacts found") + return False + else: + print(f"❌ Build {build_id} failed: {status.error_message}") + return False + + # Check for timeout + if time.time() - start_time > max_wait_time: + print("❌ Build timed out") + break + + time.sleep(2) + + return False + + finally: + orchestrator.stop() + # Cleanup + if os.path.exists(manifest_path): + os.remove(manifest_path) + + +def test_build_failure_handling(): + """Test build failure handling and error reporting""" + print("Testing build failure handling...") + + orchestrator = BuildOrchestrator() + + # Submit build with invalid manifest (should fail) + invalid_manifest_path = "nonexistent-manifest.json" + + build_id = orchestrator.submit_build( + invalid_manifest_path, + priority=5, + resource_requirements={"cpu_percent": 10, "memory_gb": 1, "storage_gb": 1} + ) + + print(f"Submitted build {build_id} with invalid manifest") + + # Start orchestrator + orchestrator.start() + + try: + # Monitor for failure + start_time = time.time() + max_wait_time = 20 + + while time.time() - start_time < max_wait_time: + status = orchestrator.get_build_status(build_id) + if status: + print(f"Build {build_id}: {status.status.value}") + + if status.status == BuildStatus.FAILED: + print(f"✅ Build failed as expected: {status.error_message}") + return True + elif status.status == BuildStatus.COMPLETED: + print("❌ Build should have failed but completed") + return False + + time.sleep(1) + + print("❌ Build did not fail within expected time") + return False + + finally: + orchestrator.stop() + + +def test_build_retry_mechanisms(): + """Test build retry mechanisms""" + print("Testing build retry mechanisms...") + + orchestrator = BuildOrchestrator() + + # Submit multiple builds to test queue behavior + build_ids = [] + for i in range(3): + build_id = orchestrator.submit_build( + "test-manifest.json", + priority=5-i, # Different priorities + resource_requirements={"cpu_percent": 10, "memory_gb": 1, "storage_gb": 1} + ) + build_ids.append(build_id) + print(f"Submitted build {build_id} with priority {5-i}") + + # Start orchestrator + orchestrator.start() + + try: + # Monitor builds + start_time = time.time() + max_wait_time = 15 + + while time.time() - start_time < max_wait_time: + # Check status of all builds + all_completed = True + for build_id in build_ids: + status = orchestrator.get_build_status(build_id) + if status and status.status not in [BuildStatus.COMPLETED, BuildStatus.FAILED]: + all_completed = False + print(f"Build {build_id}: {status.status.value}") + + if all_completed: + print("✅ All builds completed") + return True + + time.sleep(1) + + print("❌ Not all builds completed within expected time") + return False + + finally: + orchestrator.stop() + + +def test_build_cleanup(): + """Test build cleanup operations""" + print("Testing build cleanup...") + + orchestrator = BuildOrchestrator() + artifact_manager = ArtifactManager() + + # Submit a build + build_id = orchestrator.submit_build( + "test-debian-manifest.json", + priority=5, + resource_requirements={"cpu_percent": 10, "memory_gb": 1, "storage_gb": 1} + ) + + print(f"Submitted build {build_id} for cleanup test") + + # Start orchestrator briefly + orchestrator.start() + time.sleep(5) # Let it run for a bit + orchestrator.stop() + + # Test build cancellation + if orchestrator.cancel_build(build_id): + print(f"✅ Build {build_id} cancelled successfully") + + # Verify build status + status = orchestrator.get_build_status(build_id) + if status and status.status == BuildStatus.CANCELLED: + print("✅ Build status correctly shows cancelled") + + # Test artifact cleanup + artifacts = artifact_manager.get_build_artifacts(build_id) + if artifacts: + # Remove artifacts + for artifact in artifacts: + if artifact_manager.remove_artifact(artifact.id): + print(f"✅ Removed artifact {artifact.id}") + + # Verify cleanup + remaining_artifacts = artifact_manager.get_build_artifacts(build_id) + if not remaining_artifacts: + print("✅ All artifacts cleaned up successfully") + return True + else: + print(f"❌ {len(remaining_artifacts)} artifacts still remain") + return False + else: + print("✅ No artifacts to clean up") + return True + else: + print("❌ Build status not correctly updated after cancellation") + return False + else: + print("❌ Failed to cancel build") + return False + + +def test_environment_isolation(): + """Test build environment isolation""" + print("Testing build environment isolation...") + + # Create temporary build directories + with tempfile.TemporaryDirectory() as temp_dir: + build_dir = os.path.join(temp_dir, "build-001") + os.makedirs(build_dir, exist_ok=True) + + # Create isolated environment files + env_files = [ + "etc/environment", + "etc/hostname", + "var/lib/osbuild" + ] + + for env_file in env_files: + full_path = os.path.join(build_dir, env_file) + os.makedirs(os.path.dirname(full_path), exist_ok=True) + with open(full_path, 'w') as f: + f.write(f"# Isolated environment file: {env_file}\n") + + # Verify isolation + isolated_files = [] + for root, dirs, files in os.walk(build_dir): + for file in files: + file_path = os.path.join(root, file) + isolated_files.append(file_path) + + if len(isolated_files) == len(env_files): + print(f"✅ Environment isolation working: {len(isolated_files)} files created") + return True + else: + print(f"❌ Environment isolation failed: expected {len(env_files)}, got {len(isolated_files)}") + return False + + +def test_resource_cleanup(): + """Test resource cleanup after builds""" + print("Testing resource cleanup...") + + orchestrator = BuildOrchestrator() + + # Submit builds to consume resources + build_ids = [] + for i in range(2): + build_id = orchestrator.submit_build( + "test-manifest.json", + priority=5, + resource_requirements={"cpu_percent": 20, "memory_gb": 2, "storage_gb": 2} + ) + build_ids.append(build_id) + + # Start orchestrator + orchestrator.start() + + try: + # Let builds run briefly + time.sleep(5) + + # Check resource allocation + initial_status = orchestrator.get_resource_status() + print(f"Initial allocated resources: {initial_status['allocated_resources']}") + + # Stop orchestrator (should trigger cleanup) + orchestrator.stop() + + # Check resource cleanup + final_status = orchestrator.get_resource_status() + print(f"Final allocated resources: {final_status['allocated_resources']}") + + if final_status['allocated_resources'] == 0: + print("✅ Resources cleaned up successfully") + return True + else: + print("❌ Resources not properly cleaned up") + return False + + finally: + orchestrator.stop() + + +def create_test_manifest(): + """Create a simple test manifest for lifecycle testing""" + return { + "version": "2", + "pipelines": [ + { + "name": "debian-lifecycle-test", + "runner": "org.osbuild.linux", + "stages": [ + { + "name": "org.osbuild.mkdir", + "options": { + "paths": ["/tmp/test-lifecycle"] + } + }, + { + "name": "org.osbuild.copy", + "options": { + "paths": [ + { + "from": "test-debian-manifest.json", + "to": "/tmp/test-lifecycle/manifest.json" + } + ] + } + } + ] + } + ] + } + + +def main(): + """Run all build lifecycle tests""" + print("Debian Forge Build Lifecycle Tests") + print("=" * 50) + + tests = [ + test_build_submission_to_completion, + test_build_failure_handling, + test_build_retry_mechanisms, + test_build_cleanup, + test_environment_isolation, + test_resource_cleanup + ] + + passed = 0 + total = len(tests) + + for test in tests: + try: + print(f"\nRunning {test.__name__}...") + if test(): + passed += 1 + else: + print(f"❌ {test.__name__} failed") + except Exception as e: + print(f"❌ {test.__name__} failed with exception: {e}") + + print() + + print("=" * 50) + print(f"Test Results: {passed}/{total} passed") + + if passed == total: + print("🎉 All build lifecycle tests passed!") + return 0 + else: + print("⚠️ Some tests failed") + return 1 + + +if __name__ == "__main__": + sys.exit(main())