#!/usr/bin/env python3 """ Debian Forge Composer Client This module provides a client interface for interacting with OSBuild Composer to submit builds, monitor status, and manage Debian atomic image creation. """ import json import requests import time from typing import Dict, List, Optional, Any from dataclasses import dataclass from pathlib import Path @dataclass class BuildRequest: """Represents a build request for Debian atomic images""" blueprint: str target: str architecture: str = "amd64" compose_type: str = "debian-atomic" priority: str = "normal" metadata: Optional[Dict[str, Any]] = None @dataclass class BuildStatus: """Represents the status of a build""" build_id: str status: str created_at: str blueprint: str target: str architecture: str progress: Optional[Dict[str, Any]] = None logs: Optional[List[str]] = None class ComposerClient: """Client for interacting with OSBuild Composer""" def __init__(self, base_url: str = "http://localhost:8700", api_version: str = "v1", username: Optional[str] = None, password: Optional[str] = None): self.base_url = base_url.rstrip('/') self.api_version = api_version self.username = username self.password = password self.session = requests.Session() self.session.headers.update({ 'Content-Type': 'application/json', 'Accept': 'application/json' }) # Add authentication if credentials provided if username and password: self._authenticate() def _authenticate(self): """Authenticate with the composer API""" if not self.username or not self.password: return # Basic authentication for now - can be enhanced with JWT tokens later from base64 import b64encode credentials = f"{self.username}:{self.password}" encoded_credentials = b64encode(credentials.encode()).decode() self.session.headers.update({ 'Authorization': f'Basic {encoded_credentials}' }) def authenticate(self, username: str, password: str) -> bool: """Authenticate with new credentials""" self.username = username self.password = password self._authenticate() return True def check_permission(self, permission: str) -> bool: """Check if the authenticated user has a specific permission""" if not self.username: return False # Import user manager to check permissions try: from user_management import UserManager user_mgr = UserManager() return user_mgr.check_permission(self.username, permission) except ImportError: # If user management not available, assume admin access return True def _make_request(self, method: str, endpoint: str, data: Optional[Dict] = None) -> requests.Response: """Make an HTTP request to the composer API""" url = f"{self.base_url}/api/{self.api_version}/{endpoint.lstrip('/')}" try: if method.upper() == 'GET': response = self.session.get(url) elif method.upper() == 'POST': response = self.session.post(url, json=data) elif method.upper() == 'PUT': response = self.session.put(url, json=data) elif method.upper() == 'DELETE': response = self.session.delete(url) else: raise ValueError(f"Unsupported HTTP method: {method}") return response except requests.exceptions.RequestException as e: raise ConnectionError(f"Failed to connect to composer: {e}") def submit_blueprint(self, blueprint_path: str) -> Dict[str, Any]: """Submit a blueprint to composer""" # Check permission if not self.check_permission("build"): raise PermissionError("User does not have permission to submit blueprints") if not Path(blueprint_path).exists(): raise FileNotFoundError(f"Blueprint file not found: {blueprint_path}") with open(blueprint_path, 'r') as f: blueprint_data = json.load(f) response = self._make_request('POST', 'blueprints/new', blueprint_data) if response.status_code == 201: return response.json() else: raise RuntimeError(f"Failed to submit blueprint: {response.status_code} - {response.text}") def get_blueprint(self, blueprint_name: str) -> Dict[str, Any]: """Get blueprint details""" response = self._make_request('GET', f'blueprints/info/{blueprint_name}') if response.status_code == 200: return response.json() else: raise RuntimeError(f"Failed to get blueprint: {response.status_code} - {response.text}") def list_blueprints(self) -> List[str]: """List all available blueprints""" response = self._make_request('GET', 'blueprints/list') if response.status_code == 200: return response.json() else: raise RuntimeError(f"Failed to list blueprints: {response.status_code} - {response.text}") def start_compose(self, build_request: BuildRequest) -> str: """Start a compose for a blueprint""" # Check permission if not self.check_permission("build"): raise PermissionError("User does not have permission to start composes") compose_data = { "blueprint_name": build_request.blueprint, "compose_type": build_request.compose_type, "branch": "main", "distro": "debian-12", "arch": build_request.architecture, "image_type": build_request.target, "size": 0, "upload": False } if build_request.metadata: compose_data["metadata"] = build_request.metadata response = self._make_request('POST', 'compose', compose_data) if response.status_code == 201: compose_info = response.json() return compose_info.get('id', '') else: raise RuntimeError(f"Failed to start compose: {response.status_code} - {response.text}") def get_compose_status(self, compose_id: str) -> BuildStatus: """Get the status of a compose""" response = self._make_request('GET', f'compose/status/{compose_id}') if response.status_code == 200: status_data = response.json() return BuildStatus( build_id=compose_id, status=status_data.get('status', 'unknown'), created_at=status_data.get('created_at', ''), blueprint=status_data.get('blueprint', ''), target=status_data.get('image_type', ''), architecture=status_data.get('arch', ''), progress=status_data.get('progress', {}), logs=status_data.get('logs', []) ) else: raise RuntimeError(f"Failed to get compose status: {response.status_code} - {response.text}") def list_composes(self) -> List[Dict[str, Any]]: """List all composes""" response = self._make_request('GET', 'compose/list') if response.status_code == 200: return response.json() else: raise RuntimeError(f"Failed to list composes: {response.status_code} - {response.text}") def cancel_compose(self, compose_id: str) -> bool: """Cancel a running compose""" response = self._make_request('DELETE', f'compose/cancel/{compose_id}') if response.status_code == 200: return True else: raise RuntimeError(f"Failed to cancel compose: {response.status_code} - {response.text}") def get_compose_logs(self, compose_id: str) -> List[str]: """Get logs for a compose""" response = self._make_request('GET', f'compose/logs/{compose_id}') if response.status_code == 200: return response.json() else: raise RuntimeError(f"Failed to get compose logs: {response.status_code} - {response.text}") def download_image(self, compose_id: str, target_dir: str = ".") -> str: """Download the generated image""" response = self._make_request('GET', f'compose/image/{compose_id}') if response.status_code == 200: # Save the image file filename = f"debian-atomic-{compose_id}.{self._get_image_extension(compose_id)}" filepath = Path(target_dir) / filename with open(filepath, 'wb') as f: f.write(response.content) return str(filepath) else: raise RuntimeError(f"Failed to download image: {response.status_code} - {response.text}") def _get_image_extension(self, compose_id: str) -> str: """Get the appropriate file extension for the image type""" # This would need to be determined from the compose type return "qcow2" def wait_for_completion(self, compose_id: str, timeout: int = 3600, poll_interval: int = 30) -> BuildStatus: """Wait for a compose to complete""" start_time = time.time() while True: if time.time() - start_time > timeout: raise TimeoutError(f"Compose {compose_id} did not complete within {timeout} seconds") status = self.get_compose_status(compose_id) if status.status in ['FINISHED', 'FAILED']: return status time.sleep(poll_interval) class DebianAtomicBuilder: """High-level interface for building Debian atomic images""" def __init__(self, composer_client: ComposerClient): self.client = composer_client def build_base_image(self, output_format: str = "qcow2") -> str: """Build a base Debian atomic image""" build_request = BuildRequest( blueprint="debian-atomic-base", target=output_format, architecture="amd64" ) return self._build_image(build_request) def build_workstation_image(self, output_format: str = "qcow2") -> str: """Build a Debian atomic workstation image""" build_request = BuildRequest( blueprint="debian-atomic-workstation", target=output_format, architecture="amd64" ) return self._build_image(build_request) def build_server_image(self, output_format: str = "qcow2") -> str: """Build a Debian atomic server image""" build_request = BuildRequest( blueprint="debian-atomic-server", target=output_format, architecture="amd64" ) return self._build_image(build_request) def _build_image(self, build_request: BuildRequest) -> str: """Internal method to build an image""" print(f"Starting build for {build_request.blueprint}...") # Start the compose compose_id = self.client.start_compose(build_request) print(f"Compose started with ID: {compose_id}") # Wait for completion print("Waiting for build to complete...") status = self.client.wait_for_completion(compose_id) if status.status == 'FAILED': raise RuntimeError(f"Build failed for {build_request.blueprint}") print(f"Build completed successfully!") # Download the image print("Downloading image...") image_path = self.client.download_image(compose_id) print(f"Image downloaded to: {image_path}") return image_path def main(): """Example usage of the composer client""" # Create client client = ComposerClient() # Create builder builder = DebianAtomicBuilder(client) try: # Build a base image image_path = builder.build_base_image("qcow2") print(f"Successfully built base image: {image_path}") except Exception as e: print(f"Build failed: {e}") if __name__ == '__main__': main()