#!/usr/bin/env python3 """ Debian Forge Composer Integration Module This module provides integration between debian-forge and debian-forge-composer, ensuring 1:1 compatibility with the upstream osbuild/osbuild-composer. """ import json import subprocess import time import requests from typing import Dict, List, Optional, Any from dataclasses import dataclass from pathlib import Path @dataclass class ComposerBuildRequest: """Build request for composer integration""" blueprint_name: str compose_type: str architecture: str = "x86_64" distro: str = "debian-12" size: int = 0 upload: bool = False metadata: Optional[Dict[str, Any]] = None @dataclass class ComposerBuildStatus: """Build status from composer""" id: str status: str blueprint_name: str compose_type: str architecture: str created_at: str started_at: Optional[str] = None finished_at: Optional[str] = None error_message: Optional[str] = None class DebianForgeComposer: """Integration with debian-forge-composer (fork of osbuild/osbuild-composer)""" def __init__(self, composer_path: str = "../debian-forge-composer", api_url: str = "http://localhost:8700", api_version: str = "v1"): self.composer_path = Path(composer_path) self.api_url = api_url.rstrip('/') self.api_version = api_version self.session = requests.Session() self.session.headers.update({ 'Content-Type': 'application/json', 'Accept': 'application/json' }) # Verify composer path exists if not self.composer_path.exists(): raise FileNotFoundError(f"Composer path not found: {composer_path}") def start_composer_service(self, config_file: Optional[str] = None) -> bool: """Start the composer service""" try: cmd = [str(self.composer_path / "cmd" / "osbuild-composer" / "osbuild-composer")] if config_file: cmd.extend(["--config", config_file]) # Start service in background process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=str(self.composer_path) ) # Wait a moment for service to start time.sleep(2) # Check if service is responding try: response = requests.get(f"{self.api_url}/api/{self.api_version}/status", timeout=5) return response.status_code == 200 except requests.exceptions.RequestException: return False except Exception as e: print(f"Failed to start composer service: {e}") return False def get_service_status(self) -> Dict[str, Any]: """Get composer service status""" try: response = self.session.get(f"{self.api_url}/api/{self.api_version}/status") if response.status_code == 200: return response.json() else: return {"status": "error", "code": response.status_code} except requests.exceptions.RequestException as e: return {"status": "error", "message": str(e)} def submit_blueprint(self, blueprint_data: Dict[str, Any]) -> Dict[str, Any]: """Submit a blueprint to composer""" try: response = self.session.post( f"{self.api_url}/api/{self.api_version}/blueprints/new", json=blueprint_data ) if response.status_code == 201: return response.json() else: return {"error": f"Failed to submit blueprint: {response.status_code}", "details": response.text} except requests.exceptions.RequestException as e: return {"error": f"Request failed: {e}"} def get_blueprint(self, blueprint_name: str) -> Dict[str, Any]: """Get blueprint details""" try: response = self.session.get(f"{self.api_url}/api/{self.api_version}/blueprints/info/{blueprint_name}") if response.status_code == 200: return response.json() else: return {"error": f"Failed to get blueprint: {response.status_code}"} except requests.exceptions.RequestException as e: return {"error": f"Request failed: {e}"} def list_blueprints(self) -> List[str]: """List all available blueprints""" try: response = self.session.get(f"{self.api_url}/api/{self.api_version}/blueprints/list") if response.status_code == 200: return response.json() else: return [] except requests.exceptions.RequestException: return [] def start_compose(self, request: ComposerBuildRequest) -> Dict[str, Any]: """Start a compose using composer""" compose_data = { "blueprint_name": request.blueprint_name, "compose_type": request.compose_type, "branch": "main", "distro": request.distro, "arch": request.architecture, "image_type": request.compose_type, "size": request.size, "upload": request.upload } if request.metadata: compose_data["metadata"] = request.metadata try: response = self.session.post( f"{self.api_url}/api/{self.api_version}/compose", json=compose_data ) if response.status_code == 201: return response.json() else: return {"error": f"Failed to start compose: {response.status_code}", "details": response.text} except requests.exceptions.RequestException as e: return {"error": f"Request failed: {e}"} def get_compose_status(self, compose_id: str) -> ComposerBuildStatus: """Get compose status""" try: response = self.session.get(f"{self.api_url}/api/{self.api_version}/compose/status/{compose_id}") if response.status_code == 200: data = response.json() return ComposerBuildStatus( id=data.get("id", compose_id), status=data.get("status", "unknown"), blueprint_name=data.get("blueprint", ""), compose_type=data.get("image_type", ""), architecture=data.get("arch", ""), created_at=data.get("created_at", ""), started_at=data.get("started_at"), finished_at=data.get("finished_at"), error_message=data.get("error", {}).get("message") if data.get("error") else None ) else: return ComposerBuildStatus( id=compose_id, status="error", blueprint_name="", compose_type="", architecture="", created_at="", error_message=f"HTTP {response.status_code}" ) except requests.exceptions.RequestException as e: return ComposerBuildStatus( id=compose_id, status="error", blueprint_name="", compose_type="", architecture="", created_at="", error_message=str(e) ) def list_composes(self) -> List[Dict[str, Any]]: """List all composes""" try: response = self.session.get(f"{self.api_url}/api/{self.api_version}/compose/list") if response.status_code == 200: return response.json() else: return [] except requests.exceptions.RequestException: return [] def cancel_compose(self, compose_id: str) -> bool: """Cancel a compose""" try: response = self.session.delete(f"{self.api_url}/api/{self.api_version}/compose/{compose_id}") return response.status_code == 200 except requests.exceptions.RequestException: return False def get_compose_logs(self, compose_id: str) -> Dict[str, Any]: """Get compose logs""" try: response = self.session.get(f"{self.api_url}/api/{self.api_version}/compose/{compose_id}/logs") if response.status_code == 200: return response.json() else: return {"error": f"Failed to get logs: {response.status_code}"} except requests.exceptions.RequestException as e: return {"error": f"Request failed: {e}"} def get_compose_metadata(self, compose_id: str) -> Dict[str, Any]: """Get compose metadata""" try: response = self.session.get(f"{self.api_url}/api/{self.api_version}/compose/{compose_id}/metadata") if response.status_code == 200: return response.json() else: return {"error": f"Failed to get metadata: {response.status_code}"} except requests.exceptions.RequestException as e: return {"error": f"Request failed: {e}"} def create_debian_blueprint(self, name: str, version: str, packages: List[str], customizations: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: """Create a Debian-specific blueprint for composer""" blueprint = { "name": name, "version": version, "description": f"Debian atomic blueprint for {name}", "packages": packages, "modules": [], "groups": [], "customizations": customizations or {} } # Add Debian-specific customizations if "debian" not in blueprint["customizations"]: blueprint["customizations"]["debian"] = { "repositories": [ { "name": "debian-main", "baseurl": "http://deb.debian.org/debian", "enabled": True } ] } return blueprint def test_composer_integration(self) -> Dict[str, Any]: """Test composer integration functionality""" results = { "composer_path_exists": self.composer_path.exists(), "api_accessible": False, "blueprint_operations": False, "compose_operations": False } # Test API accessibility try: status = self.get_service_status() if "status" in status and status["status"] != "error": results["api_accessible"] = True except Exception: pass # Test blueprint operations if results["api_accessible"]: try: # Create test blueprint test_blueprint = self.create_debian_blueprint( "test-integration", "1.0.0", ["bash", "coreutils"] ) # Submit blueprint submit_result = self.submit_blueprint(test_blueprint) if "error" not in submit_result: results["blueprint_operations"] = True # Clean up test blueprint # Note: Composer doesn't have a delete blueprint endpoint in standard API except Exception as e: results["blueprint_error"] = str(e) # Test compose operations if results["blueprint_operations"]: try: # Try to start a compose (may fail due to missing distro/repo config) compose_request = ComposerBuildRequest( blueprint_name="test-integration", compose_type="qcow2" ) compose_result = self.start_compose(compose_request) if "error" not in compose_result: results["compose_operations"] = True else: results["compose_error"] = compose_result["error"] except Exception as e: results["compose_error"] = str(e) return results def get_composer_version(self) -> str: """Get composer version information""" try: # Try to get version from API response = self.session.get(f"{self.api_url}/api/{self.api_version}/version") if response.status_code == 200: data = response.json() return data.get("version", "unknown") except: pass # Fallback: try to get version from binary try: composer_binary = self.composer_path / "cmd" / "osbuild-composer" / "osbuild-composer" if composer_binary.exists(): result = subprocess.run([str(composer_binary), "--version"], capture_output=True, text=True, check=True) return result.stdout.strip() except: pass return "Version unknown" def get_system_info(self) -> Dict[str, Any]: """Get system information from composer""" try: response = self.session.get(f"{self.api_url}/api/{self.api_version}/system/info") if response.status_code == 200: return response.json() else: return {"error": f"Failed to get system info: {response.status_code}"} except requests.exceptions.RequestException as e: return {"error": f"Request failed: {e}"} def get_worker_status(self) -> Dict[str, Any]: """Get worker status from composer""" try: response = self.session.get(f"{self.api_url}/api/{self.api_version}/workers/status") if response.status_code == 200: return response.json() else: return {"error": f"Failed to get worker status: {response.status_code}"} except requests.exceptions.RequestException as e: return {"error": f"Request failed: {e}"}