debian-forge-composer/composer_integration.py
robojerk 4eeaa43c39
Some checks failed
Tests / 🛃 Unit tests (push) Failing after 13s
Tests / 🗄 DB tests (push) Failing after 19s
Tests / 🐍 Lint python scripts (push) Failing after 1s
Tests / ⌨ Golang Lint (push) Failing after 1s
Tests / 📦 Packit config lint (push) Failing after 1s
Tests / 🔍 Check source preparation (push) Failing after 1s
Tests / 🔍 Check for valid snapshot urls (push) Failing after 1s
Tests / 🔍 Check for missing or unused runner repos (push) Failing after 1s
Tests / 🐚 Shellcheck (push) Failing after 1s
Tests / 📦 RPMlint (push) Failing after 1s
Tests / Gitlab CI trigger helper (push) Failing after 1s
Tests / 🎀 kube-linter (push) Failing after 1s
Tests / 🧹 cloud-cleaner-is-enabled (push) Successful in 3s
Tests / 🔍 Check spec file osbuild/images dependencies (push) Failing after 1s
did stuff
2025-08-26 10:34:42 -07:00

391 lines
14 KiB
Python

#!/usr/bin/env python3
"""
Debian Forge Composer Integration Module
This module provides integration between debian-forge and debian-forge-composer,
ensuring 1:1 compatibility with the upstream osbuild/osbuild-composer.
"""
import json
import subprocess
import time
import requests
from typing import Dict, List, Optional, Any
from dataclasses import dataclass
from pathlib import Path
@dataclass
class ComposerBuildRequest:
"""Build request for composer integration"""
blueprint_name: str
compose_type: str
architecture: str = "x86_64"
distro: str = "debian-12"
size: int = 0
upload: bool = False
metadata: Optional[Dict[str, Any]] = None
@dataclass
class ComposerBuildStatus:
"""Build status from composer"""
id: str
status: str
blueprint_name: str
compose_type: str
architecture: str
created_at: str
started_at: Optional[str] = None
finished_at: Optional[str] = None
error_message: Optional[str] = None
class DebianForgeComposer:
"""Integration with debian-forge-composer (fork of osbuild/osbuild-composer)"""
def __init__(self, composer_path: str = "../debian-forge-composer",
api_url: str = "http://localhost:8700",
api_version: str = "v1"):
self.composer_path = Path(composer_path)
self.api_url = api_url.rstrip('/')
self.api_version = api_version
self.session = requests.Session()
self.session.headers.update({
'Content-Type': 'application/json',
'Accept': 'application/json'
})
# Verify composer path exists
if not self.composer_path.exists():
raise FileNotFoundError(f"Composer path not found: {composer_path}")
def start_composer_service(self, config_file: Optional[str] = None) -> bool:
"""Start the composer service"""
try:
cmd = [str(self.composer_path / "cmd" / "osbuild-composer" / "osbuild-composer")]
if config_file:
cmd.extend(["--config", config_file])
# Start service in background
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
cwd=str(self.composer_path)
)
# Wait a moment for service to start
time.sleep(2)
# Check if service is responding
try:
response = requests.get(f"{self.api_url}/api/{self.api_version}/status", timeout=5)
return response.status_code == 200
except requests.exceptions.RequestException:
return False
except Exception as e:
print(f"Failed to start composer service: {e}")
return False
def get_service_status(self) -> Dict[str, Any]:
"""Get composer service status"""
try:
response = self.session.get(f"{self.api_url}/api/{self.api_version}/status")
if response.status_code == 200:
return response.json()
else:
return {"status": "error", "code": response.status_code}
except requests.exceptions.RequestException as e:
return {"status": "error", "message": str(e)}
def submit_blueprint(self, blueprint_data: Dict[str, Any]) -> Dict[str, Any]:
"""Submit a blueprint to composer"""
try:
response = self.session.post(
f"{self.api_url}/api/{self.api_version}/blueprints/new",
json=blueprint_data
)
if response.status_code == 201:
return response.json()
else:
return {"error": f"Failed to submit blueprint: {response.status_code}", "details": response.text}
except requests.exceptions.RequestException as e:
return {"error": f"Request failed: {e}"}
def get_blueprint(self, blueprint_name: str) -> Dict[str, Any]:
"""Get blueprint details"""
try:
response = self.session.get(f"{self.api_url}/api/{self.api_version}/blueprints/info/{blueprint_name}")
if response.status_code == 200:
return response.json()
else:
return {"error": f"Failed to get blueprint: {response.status_code}"}
except requests.exceptions.RequestException as e:
return {"error": f"Request failed: {e}"}
def list_blueprints(self) -> List[str]:
"""List all available blueprints"""
try:
response = self.session.get(f"{self.api_url}/api/{self.api_version}/blueprints/list")
if response.status_code == 200:
return response.json()
else:
return []
except requests.exceptions.RequestException:
return []
def start_compose(self, request: ComposerBuildRequest) -> Dict[str, Any]:
"""Start a compose using composer"""
compose_data = {
"blueprint_name": request.blueprint_name,
"compose_type": request.compose_type,
"branch": "main",
"distro": request.distro,
"arch": request.architecture,
"image_type": request.compose_type,
"size": request.size,
"upload": request.upload
}
if request.metadata:
compose_data["metadata"] = request.metadata
try:
response = self.session.post(
f"{self.api_url}/api/{self.api_version}/compose",
json=compose_data
)
if response.status_code == 201:
return response.json()
else:
return {"error": f"Failed to start compose: {response.status_code}", "details": response.text}
except requests.exceptions.RequestException as e:
return {"error": f"Request failed: {e}"}
def get_compose_status(self, compose_id: str) -> ComposerBuildStatus:
"""Get compose status"""
try:
response = self.session.get(f"{self.api_url}/api/{self.api_version}/compose/status/{compose_id}")
if response.status_code == 200:
data = response.json()
return ComposerBuildStatus(
id=data.get("id", compose_id),
status=data.get("status", "unknown"),
blueprint_name=data.get("blueprint", ""),
compose_type=data.get("image_type", ""),
architecture=data.get("arch", ""),
created_at=data.get("created_at", ""),
started_at=data.get("started_at"),
finished_at=data.get("finished_at"),
error_message=data.get("error", {}).get("message") if data.get("error") else None
)
else:
return ComposerBuildStatus(
id=compose_id,
status="error",
blueprint_name="",
compose_type="",
architecture="",
created_at="",
error_message=f"HTTP {response.status_code}"
)
except requests.exceptions.RequestException as e:
return ComposerBuildStatus(
id=compose_id,
status="error",
blueprint_name="",
compose_type="",
architecture="",
created_at="",
error_message=str(e)
)
def list_composes(self) -> List[Dict[str, Any]]:
"""List all composes"""
try:
response = self.session.get(f"{self.api_url}/api/{self.api_version}/compose/list")
if response.status_code == 200:
return response.json()
else:
return []
except requests.exceptions.RequestException:
return []
def cancel_compose(self, compose_id: str) -> bool:
"""Cancel a compose"""
try:
response = self.session.delete(f"{self.api_url}/api/{self.api_version}/compose/{compose_id}")
return response.status_code == 200
except requests.exceptions.RequestException:
return False
def get_compose_logs(self, compose_id: str) -> Dict[str, Any]:
"""Get compose logs"""
try:
response = self.session.get(f"{self.api_url}/api/{self.api_version}/compose/{compose_id}/logs")
if response.status_code == 200:
return response.json()
else:
return {"error": f"Failed to get logs: {response.status_code}"}
except requests.exceptions.RequestException as e:
return {"error": f"Request failed: {e}"}
def get_compose_metadata(self, compose_id: str) -> Dict[str, Any]:
"""Get compose metadata"""
try:
response = self.session.get(f"{self.api_url}/api/{self.api_version}/compose/{compose_id}/metadata")
if response.status_code == 200:
return response.json()
else:
return {"error": f"Failed to get metadata: {response.status_code}"}
except requests.exceptions.RequestException as e:
return {"error": f"Request failed: {e}"}
def create_debian_blueprint(self, name: str, version: str,
packages: List[str],
customizations: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""Create a Debian-specific blueprint for composer"""
blueprint = {
"name": name,
"version": version,
"description": f"Debian atomic blueprint for {name}",
"packages": packages,
"modules": [],
"groups": [],
"customizations": customizations or {}
}
# Add Debian-specific customizations
if "debian" not in blueprint["customizations"]:
blueprint["customizations"]["debian"] = {
"repositories": [
{
"name": "debian-main",
"baseurl": "http://deb.debian.org/debian",
"enabled": True
}
]
}
return blueprint
def test_composer_integration(self) -> Dict[str, Any]:
"""Test composer integration functionality"""
results = {
"composer_path_exists": self.composer_path.exists(),
"api_accessible": False,
"blueprint_operations": False,
"compose_operations": False
}
# Test API accessibility
try:
status = self.get_service_status()
if "status" in status and status["status"] != "error":
results["api_accessible"] = True
except Exception:
pass
# Test blueprint operations
if results["api_accessible"]:
try:
# Create test blueprint
test_blueprint = self.create_debian_blueprint(
"test-integration", "1.0.0", ["bash", "coreutils"]
)
# Submit blueprint
submit_result = self.submit_blueprint(test_blueprint)
if "error" not in submit_result:
results["blueprint_operations"] = True
# Clean up test blueprint
# Note: Composer doesn't have a delete blueprint endpoint in standard API
except Exception as e:
results["blueprint_error"] = str(e)
# Test compose operations
if results["blueprint_operations"]:
try:
# Try to start a compose (may fail due to missing distro/repo config)
compose_request = ComposerBuildRequest(
blueprint_name="test-integration",
compose_type="qcow2"
)
compose_result = self.start_compose(compose_request)
if "error" not in compose_result:
results["compose_operations"] = True
else:
results["compose_error"] = compose_result["error"]
except Exception as e:
results["compose_error"] = str(e)
return results
def get_composer_version(self) -> str:
"""Get composer version information"""
try:
# Try to get version from API
response = self.session.get(f"{self.api_url}/api/{self.api_version}/version")
if response.status_code == 200:
data = response.json()
return data.get("version", "unknown")
except:
pass
# Fallback: try to get version from binary
try:
composer_binary = self.composer_path / "cmd" / "osbuild-composer" / "osbuild-composer"
if composer_binary.exists():
result = subprocess.run([str(composer_binary), "--version"],
capture_output=True, text=True, check=True)
return result.stdout.strip()
except:
pass
return "Version unknown"
def get_system_info(self) -> Dict[str, Any]:
"""Get system information from composer"""
try:
response = self.session.get(f"{self.api_url}/api/{self.api_version}/system/info")
if response.status_code == 200:
return response.json()
else:
return {"error": f"Failed to get system info: {response.status_code}"}
except requests.exceptions.RequestException as e:
return {"error": f"Request failed: {e}"}
def get_worker_status(self) -> Dict[str, Any]:
"""Get worker status from composer"""
try:
response = self.session.get(f"{self.api_url}/api/{self.api_version}/workers/status")
if response.status_code == 200:
return response.json()
else:
return {"error": f"Failed to get worker status: {response.status_code}"}
except requests.exceptions.RequestException as e:
return {"error": f"Request failed: {e}"}