197 lines
6.7 KiB
Python
197 lines
6.7 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Integration test script to demonstrate how deb-bootc-compose, deb-orchestrator, and deb-mock work together.
|
|
|
|
This script simulates the complete workflow:
|
|
1. deb-bootc-compose creates a compose request
|
|
2. deb-orchestrator manages the build tasks
|
|
3. deb-mock executes the builds in isolated environments
|
|
"""
|
|
|
|
import json
|
|
import requests
|
|
import subprocess
|
|
import time
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
# Configuration
|
|
ORCHESTRATOR_URL = "http://localhost:8080"
|
|
COMPOSE_CONFIG = {
|
|
"name": "debian-bootc-minimal",
|
|
"version": "1.0.0",
|
|
"architecture": "amd64",
|
|
"suite": "bookworm",
|
|
"packages": ["linux-image-amd64", "systemd", "udev"],
|
|
"repositories": ["http://deb.debian.org/debian/"],
|
|
"output_format": "ostree"
|
|
}
|
|
|
|
def test_orchestrator_health():
|
|
"""Test if deb-orchestrator hub is healthy"""
|
|
try:
|
|
response = requests.get(f"{ORCHESTRATOR_URL}/health")
|
|
if response.status_code == 200:
|
|
print("✅ deb-orchestrator hub is healthy")
|
|
return True
|
|
else:
|
|
print(f"❌ deb-orchestrator hub health check failed: {response.status_code}")
|
|
return False
|
|
except requests.exceptions.RequestException as e:
|
|
print(f"❌ Cannot connect to deb-orchestrator hub: {e}")
|
|
return False
|
|
|
|
def test_task_creation():
|
|
"""Test creating a build task in deb-orchestrator"""
|
|
try:
|
|
task_data = {
|
|
"method": "build",
|
|
"args": ["linux-image-amd64"],
|
|
"priority": 1,
|
|
"arch": "amd64",
|
|
"channel": "stable"
|
|
}
|
|
|
|
response = requests.post(f"{ORCHESTRATOR_URL}/api/v1/tasks", json=task_data)
|
|
if response.status_code == 201:
|
|
task = response.json()
|
|
print(f"✅ Created build task {task['id']}: {task['method']}")
|
|
return task['id']
|
|
else:
|
|
print(f"❌ Failed to create task: {response.status_code}")
|
|
return None
|
|
except requests.exceptions.RequestException as e:
|
|
print(f"❌ Error creating task: {e}")
|
|
return None
|
|
|
|
def test_host_registration():
|
|
"""Test registering a build host in deb-orchestrator"""
|
|
try:
|
|
host_data = {
|
|
"name": "integration-test-host",
|
|
"arch": "amd64"
|
|
}
|
|
|
|
response = requests.post(f"{ORCHESTRATOR_URL}/api/v1/hosts", json=host_data)
|
|
if response.status_code == 201:
|
|
host = response.json()
|
|
print(f"✅ Registered build host {host['id']}: {host['name']}")
|
|
return host['id']
|
|
else:
|
|
print(f"❌ Failed to register host: {response.status_code}")
|
|
return None
|
|
except requests.exceptions.RequestException as e:
|
|
print(f"❌ Error registering host: {e}")
|
|
return None
|
|
|
|
def test_task_lifecycle(task_id, host_id):
|
|
"""Test the complete task lifecycle"""
|
|
try:
|
|
# Assign task to host
|
|
response = requests.post(f"{ORCHESTRATOR_URL}/api/v1/tasks/{task_id}/assign")
|
|
if response.status_code == 200:
|
|
print(f"✅ Task {task_id} assigned to host {host_id}")
|
|
else:
|
|
print(f"❌ Failed to assign task: {response.status_code}")
|
|
return False
|
|
|
|
# Start task
|
|
response = requests.post(f"{ORCHESTRATOR_URL}/api/v1/tasks/{task_id}/start")
|
|
if response.status_code == 200:
|
|
print(f"✅ Task {task_id} started")
|
|
else:
|
|
print(f"❌ Failed to start task: {response.status_code}")
|
|
return False
|
|
|
|
# Simulate build execution (in real scenario, deb-mock would be called here)
|
|
print("🔄 Simulating build execution with deb-mock...")
|
|
time.sleep(2) # Simulate build time
|
|
|
|
# Complete task
|
|
result_data = {"result": "Build completed successfully"}
|
|
response = requests.post(f"{ORCHESTRATOR_URL}/api/v1/tasks/{task_id}/complete", json=result_data)
|
|
if response.status_code == 200:
|
|
print(f"✅ Task {task_id} completed")
|
|
return True
|
|
else:
|
|
print(f"❌ Failed to complete task: {response.status_code}")
|
|
return False
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
print(f"❌ Error in task lifecycle: {e}")
|
|
return False
|
|
|
|
def test_deb_mock_integration():
|
|
"""Test deb-mock basic functionality"""
|
|
try:
|
|
# Check if deb-mock is available
|
|
result = subprocess.run(["python3", "-c", "import deb_mock"],
|
|
capture_output=True, text=True)
|
|
if result.returncode == 0:
|
|
print("✅ deb-mock module is available")
|
|
return True
|
|
else:
|
|
print("❌ deb-mock module not available")
|
|
return False
|
|
except Exception as e:
|
|
print(f"❌ Error testing deb-mock: {e}")
|
|
return False
|
|
|
|
def test_deb_bootc_compose():
|
|
"""Test deb-bootc-compose basic functionality"""
|
|
try:
|
|
# Check if deb-bootc-compose binary exists
|
|
compose_bin = Path("../deb-bootc-compose/build/deb-bootc-compose")
|
|
if compose_bin.exists():
|
|
print("✅ deb-bootc-compose binary found")
|
|
return True
|
|
else:
|
|
print("❌ deb-bootc-compose binary not found")
|
|
return False
|
|
except Exception as e:
|
|
print(f"❌ Error testing deb-bootc-compose: {e}")
|
|
return False
|
|
|
|
def main():
|
|
"""Main integration test"""
|
|
print("🚀 Starting integration test for deb-bootc-compose ecosystem...")
|
|
print("=" * 60)
|
|
|
|
# Test 1: Orchestrator health
|
|
if not test_orchestrator_health():
|
|
print("❌ Orchestrator health check failed. Exiting.")
|
|
sys.exit(1)
|
|
|
|
# Test 2: Host registration
|
|
host_id = test_host_registration()
|
|
if not host_id:
|
|
print("❌ Host registration failed. Exiting.")
|
|
sys.exit(1)
|
|
|
|
# Test 3: Task creation
|
|
task_id = test_task_creation()
|
|
if not task_id:
|
|
print("❌ Task creation failed. Exiting.")
|
|
sys.exit(1)
|
|
|
|
# Test 4: Task lifecycle
|
|
if not test_task_lifecycle(task_id, host_id):
|
|
print("❌ Task lifecycle test failed. Exiting.")
|
|
sys.exit(1)
|
|
|
|
# Test 5: deb-mock integration
|
|
test_deb_mock_integration()
|
|
|
|
# Test 6: deb-bootc-compose integration
|
|
test_deb_bootc_compose()
|
|
|
|
print("=" * 60)
|
|
print("🎉 Integration test completed successfully!")
|
|
print("\n📋 Summary:")
|
|
print("✅ deb-orchestrator: Task management and host coordination")
|
|
print("✅ deb-mock: Build environment isolation (module available)")
|
|
print("✅ deb-bootc-compose: Compose orchestration (binary available)")
|
|
print("\n🔗 The three tools are ready for integration!")
|
|
|
|
if __name__ == "__main__":
|
|
main()
|