Some checks failed
Tests / 🛃 Unit tests (push) Failing after 13s
Tests / 🗄 DB tests (push) Failing after 19s
Tests / 🐍 Lint python scripts (push) Failing after 1s
Tests / ⌨ Golang Lint (push) Failing after 1s
Tests / 📦 Packit config lint (push) Failing after 1s
Tests / 🔍 Check source preparation (push) Failing after 1s
Tests / 🔍 Check for valid snapshot urls (push) Failing after 1s
Tests / 🔍 Check for missing or unused runner repos (push) Failing after 1s
Tests / 🐚 Shellcheck (push) Failing after 1s
Tests / 📦 RPMlint (push) Failing after 1s
Tests / Gitlab CI trigger helper (push) Failing after 1s
Tests / 🎀 kube-linter (push) Failing after 1s
Tests / 🧹 cloud-cleaner-is-enabled (push) Successful in 3s
Tests / 🔍 Check spec file osbuild/images dependencies (push) Failing after 1s
391 lines
14 KiB
Python
391 lines
14 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Debian Forge Composer Integration Module
|
|
|
|
This module provides integration between debian-forge and debian-forge-composer,
|
|
ensuring 1:1 compatibility with the upstream osbuild/osbuild-composer.
|
|
"""
|
|
|
|
import json
|
|
import subprocess
|
|
import time
|
|
import requests
|
|
from typing import Dict, List, Optional, Any
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
|
|
@dataclass
|
|
class ComposerBuildRequest:
|
|
"""Build request for composer integration"""
|
|
blueprint_name: str
|
|
compose_type: str
|
|
architecture: str = "x86_64"
|
|
distro: str = "debian-12"
|
|
size: int = 0
|
|
upload: bool = False
|
|
metadata: Optional[Dict[str, Any]] = None
|
|
|
|
@dataclass
|
|
class ComposerBuildStatus:
|
|
"""Build status from composer"""
|
|
id: str
|
|
status: str
|
|
blueprint_name: str
|
|
compose_type: str
|
|
architecture: str
|
|
created_at: str
|
|
started_at: Optional[str] = None
|
|
finished_at: Optional[str] = None
|
|
error_message: Optional[str] = None
|
|
|
|
class DebianForgeComposer:
|
|
"""Integration with debian-forge-composer (fork of osbuild/osbuild-composer)"""
|
|
|
|
def __init__(self, composer_path: str = "../debian-forge-composer",
|
|
api_url: str = "http://localhost:8700",
|
|
api_version: str = "v1"):
|
|
self.composer_path = Path(composer_path)
|
|
self.api_url = api_url.rstrip('/')
|
|
self.api_version = api_version
|
|
self.session = requests.Session()
|
|
self.session.headers.update({
|
|
'Content-Type': 'application/json',
|
|
'Accept': 'application/json'
|
|
})
|
|
|
|
# Verify composer path exists
|
|
if not self.composer_path.exists():
|
|
raise FileNotFoundError(f"Composer path not found: {composer_path}")
|
|
|
|
def start_composer_service(self, config_file: Optional[str] = None) -> bool:
|
|
"""Start the composer service"""
|
|
try:
|
|
cmd = [str(self.composer_path / "cmd" / "osbuild-composer" / "osbuild-composer")]
|
|
|
|
if config_file:
|
|
cmd.extend(["--config", config_file])
|
|
|
|
# Start service in background
|
|
process = subprocess.Popen(
|
|
cmd,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
cwd=str(self.composer_path)
|
|
)
|
|
|
|
# Wait a moment for service to start
|
|
time.sleep(2)
|
|
|
|
# Check if service is responding
|
|
try:
|
|
response = requests.get(f"{self.api_url}/api/{self.api_version}/status", timeout=5)
|
|
return response.status_code == 200
|
|
except requests.exceptions.RequestException:
|
|
return False
|
|
|
|
except Exception as e:
|
|
print(f"Failed to start composer service: {e}")
|
|
return False
|
|
|
|
def get_service_status(self) -> Dict[str, Any]:
|
|
"""Get composer service status"""
|
|
try:
|
|
response = self.session.get(f"{self.api_url}/api/{self.api_version}/status")
|
|
if response.status_code == 200:
|
|
return response.json()
|
|
else:
|
|
return {"status": "error", "code": response.status_code}
|
|
except requests.exceptions.RequestException as e:
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
def submit_blueprint(self, blueprint_data: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Submit a blueprint to composer"""
|
|
try:
|
|
response = self.session.post(
|
|
f"{self.api_url}/api/{self.api_version}/blueprints/new",
|
|
json=blueprint_data
|
|
)
|
|
|
|
if response.status_code == 201:
|
|
return response.json()
|
|
else:
|
|
return {"error": f"Failed to submit blueprint: {response.status_code}", "details": response.text}
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
return {"error": f"Request failed: {e}"}
|
|
|
|
def get_blueprint(self, blueprint_name: str) -> Dict[str, Any]:
|
|
"""Get blueprint details"""
|
|
try:
|
|
response = self.session.get(f"{self.api_url}/api/{self.api_version}/blueprints/info/{blueprint_name}")
|
|
|
|
if response.status_code == 200:
|
|
return response.json()
|
|
else:
|
|
return {"error": f"Failed to get blueprint: {response.status_code}"}
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
return {"error": f"Request failed: {e}"}
|
|
|
|
def list_blueprints(self) -> List[str]:
|
|
"""List all available blueprints"""
|
|
try:
|
|
response = self.session.get(f"{self.api_url}/api/{self.api_version}/blueprints/list")
|
|
|
|
if response.status_code == 200:
|
|
return response.json()
|
|
else:
|
|
return []
|
|
|
|
except requests.exceptions.RequestException:
|
|
return []
|
|
|
|
def start_compose(self, request: ComposerBuildRequest) -> Dict[str, Any]:
|
|
"""Start a compose using composer"""
|
|
compose_data = {
|
|
"blueprint_name": request.blueprint_name,
|
|
"compose_type": request.compose_type,
|
|
"branch": "main",
|
|
"distro": request.distro,
|
|
"arch": request.architecture,
|
|
"image_type": request.compose_type,
|
|
"size": request.size,
|
|
"upload": request.upload
|
|
}
|
|
|
|
if request.metadata:
|
|
compose_data["metadata"] = request.metadata
|
|
|
|
try:
|
|
response = self.session.post(
|
|
f"{self.api_url}/api/{self.api_version}/compose",
|
|
json=compose_data
|
|
)
|
|
|
|
if response.status_code == 201:
|
|
return response.json()
|
|
else:
|
|
return {"error": f"Failed to start compose: {response.status_code}", "details": response.text}
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
return {"error": f"Request failed: {e}"}
|
|
|
|
def get_compose_status(self, compose_id: str) -> ComposerBuildStatus:
|
|
"""Get compose status"""
|
|
try:
|
|
response = self.session.get(f"{self.api_url}/api/{self.api_version}/compose/status/{compose_id}")
|
|
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
return ComposerBuildStatus(
|
|
id=data.get("id", compose_id),
|
|
status=data.get("status", "unknown"),
|
|
blueprint_name=data.get("blueprint", ""),
|
|
compose_type=data.get("image_type", ""),
|
|
architecture=data.get("arch", ""),
|
|
created_at=data.get("created_at", ""),
|
|
started_at=data.get("started_at"),
|
|
finished_at=data.get("finished_at"),
|
|
error_message=data.get("error", {}).get("message") if data.get("error") else None
|
|
)
|
|
else:
|
|
return ComposerBuildStatus(
|
|
id=compose_id,
|
|
status="error",
|
|
blueprint_name="",
|
|
compose_type="",
|
|
architecture="",
|
|
created_at="",
|
|
error_message=f"HTTP {response.status_code}"
|
|
)
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
return ComposerBuildStatus(
|
|
id=compose_id,
|
|
status="error",
|
|
blueprint_name="",
|
|
compose_type="",
|
|
architecture="",
|
|
created_at="",
|
|
error_message=str(e)
|
|
)
|
|
|
|
def list_composes(self) -> List[Dict[str, Any]]:
|
|
"""List all composes"""
|
|
try:
|
|
response = self.session.get(f"{self.api_url}/api/{self.api_version}/compose/list")
|
|
|
|
if response.status_code == 200:
|
|
return response.json()
|
|
else:
|
|
return []
|
|
|
|
except requests.exceptions.RequestException:
|
|
return []
|
|
|
|
def cancel_compose(self, compose_id: str) -> bool:
|
|
"""Cancel a compose"""
|
|
try:
|
|
response = self.session.delete(f"{self.api_url}/api/{self.api_version}/compose/{compose_id}")
|
|
return response.status_code == 200
|
|
except requests.exceptions.RequestException:
|
|
return False
|
|
|
|
def get_compose_logs(self, compose_id: str) -> Dict[str, Any]:
|
|
"""Get compose logs"""
|
|
try:
|
|
response = self.session.get(f"{self.api_url}/api/{self.api_version}/compose/{compose_id}/logs")
|
|
|
|
if response.status_code == 200:
|
|
return response.json()
|
|
else:
|
|
return {"error": f"Failed to get logs: {response.status_code}"}
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
return {"error": f"Request failed: {e}"}
|
|
|
|
def get_compose_metadata(self, compose_id: str) -> Dict[str, Any]:
|
|
"""Get compose metadata"""
|
|
try:
|
|
response = self.session.get(f"{self.api_url}/api/{self.api_version}/compose/{compose_id}/metadata")
|
|
|
|
if response.status_code == 200:
|
|
return response.json()
|
|
else:
|
|
return {"error": f"Failed to get metadata: {response.status_code}"}
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
return {"error": f"Request failed: {e}"}
|
|
|
|
def create_debian_blueprint(self, name: str, version: str,
|
|
packages: List[str],
|
|
customizations: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
|
|
"""Create a Debian-specific blueprint for composer"""
|
|
blueprint = {
|
|
"name": name,
|
|
"version": version,
|
|
"description": f"Debian atomic blueprint for {name}",
|
|
"packages": packages,
|
|
"modules": [],
|
|
"groups": [],
|
|
"customizations": customizations or {}
|
|
}
|
|
|
|
# Add Debian-specific customizations
|
|
if "debian" not in blueprint["customizations"]:
|
|
blueprint["customizations"]["debian"] = {
|
|
"repositories": [
|
|
{
|
|
"name": "debian-main",
|
|
"baseurl": "http://deb.debian.org/debian",
|
|
"enabled": True
|
|
}
|
|
]
|
|
}
|
|
|
|
return blueprint
|
|
|
|
def test_composer_integration(self) -> Dict[str, Any]:
|
|
"""Test composer integration functionality"""
|
|
results = {
|
|
"composer_path_exists": self.composer_path.exists(),
|
|
"api_accessible": False,
|
|
"blueprint_operations": False,
|
|
"compose_operations": False
|
|
}
|
|
|
|
# Test API accessibility
|
|
try:
|
|
status = self.get_service_status()
|
|
if "status" in status and status["status"] != "error":
|
|
results["api_accessible"] = True
|
|
except Exception:
|
|
pass
|
|
|
|
# Test blueprint operations
|
|
if results["api_accessible"]:
|
|
try:
|
|
# Create test blueprint
|
|
test_blueprint = self.create_debian_blueprint(
|
|
"test-integration", "1.0.0", ["bash", "coreutils"]
|
|
)
|
|
|
|
# Submit blueprint
|
|
submit_result = self.submit_blueprint(test_blueprint)
|
|
if "error" not in submit_result:
|
|
results["blueprint_operations"] = True
|
|
|
|
# Clean up test blueprint
|
|
# Note: Composer doesn't have a delete blueprint endpoint in standard API
|
|
|
|
except Exception as e:
|
|
results["blueprint_error"] = str(e)
|
|
|
|
# Test compose operations
|
|
if results["blueprint_operations"]:
|
|
try:
|
|
# Try to start a compose (may fail due to missing distro/repo config)
|
|
compose_request = ComposerBuildRequest(
|
|
blueprint_name="test-integration",
|
|
compose_type="qcow2"
|
|
)
|
|
|
|
compose_result = self.start_compose(compose_request)
|
|
if "error" not in compose_result:
|
|
results["compose_operations"] = True
|
|
else:
|
|
results["compose_error"] = compose_result["error"]
|
|
|
|
except Exception as e:
|
|
results["compose_error"] = str(e)
|
|
|
|
return results
|
|
|
|
def get_composer_version(self) -> str:
|
|
"""Get composer version information"""
|
|
try:
|
|
# Try to get version from API
|
|
response = self.session.get(f"{self.api_url}/api/{self.api_version}/version")
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
return data.get("version", "unknown")
|
|
except:
|
|
pass
|
|
|
|
# Fallback: try to get version from binary
|
|
try:
|
|
composer_binary = self.composer_path / "cmd" / "osbuild-composer" / "osbuild-composer"
|
|
if composer_binary.exists():
|
|
result = subprocess.run([str(composer_binary), "--version"],
|
|
capture_output=True, text=True, check=True)
|
|
return result.stdout.strip()
|
|
except:
|
|
pass
|
|
|
|
return "Version unknown"
|
|
|
|
def get_system_info(self) -> Dict[str, Any]:
|
|
"""Get system information from composer"""
|
|
try:
|
|
response = self.session.get(f"{self.api_url}/api/{self.api_version}/system/info")
|
|
|
|
if response.status_code == 200:
|
|
return response.json()
|
|
else:
|
|
return {"error": f"Failed to get system info: {response.status_code}"}
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
return {"error": f"Request failed: {e}"}
|
|
|
|
def get_worker_status(self) -> Dict[str, Any]:
|
|
"""Get worker status from composer"""
|
|
try:
|
|
response = self.session.get(f"{self.api_url}/api/{self.api_version}/workers/status")
|
|
|
|
if response.status_code == 200:
|
|
return response.json()
|
|
else:
|
|
return {"error": f"Failed to get worker status: {response.status_code}"}
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
return {"error": f"Request failed: {e}"}
|