475 lines
16 KiB
Python
475 lines
16 KiB
Python
#!/usr/bin/python3
|
|
"""
|
|
Debian Forge Build Environment Manager
|
|
|
|
Manages isolated build environments, cleanup, and reuse policies for OSBuild builds.
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import time
|
|
import shutil
|
|
import tempfile
|
|
import subprocess
|
|
import threading
|
|
from datetime import datetime, timedelta
|
|
from typing import Dict, List, Optional, Any, Tuple
|
|
from pathlib import Path
|
|
from dataclasses import dataclass, asdict
|
|
from enum import Enum
|
|
|
|
|
|
class EnvironmentStatus(Enum):
|
|
CREATING = "creating"
|
|
READY = "ready"
|
|
IN_USE = "in_use"
|
|
CLEANING = "cleaning"
|
|
CLEANED = "cleaned"
|
|
FAILED = "failed"
|
|
|
|
|
|
@dataclass
|
|
class BuildEnvironment:
|
|
"""Represents a build environment"""
|
|
id: str
|
|
base_path: str
|
|
status: EnvironmentStatus
|
|
created_at: datetime
|
|
last_used: Optional[datetime] = None
|
|
use_count: int = 0
|
|
metadata: Optional[Dict[str, Any]] = None
|
|
resource_usage: Optional[Dict[str, Any]] = None
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
"""Convert to dictionary for serialization"""
|
|
data = asdict(self)
|
|
data['status'] = self.status.value
|
|
data['created_at'] = self.created_at.isoformat()
|
|
if self.last_used:
|
|
data['last_used'] = self.last_used.isoformat()
|
|
return data
|
|
|
|
|
|
class EnvironmentIsolation:
|
|
"""Handles build environment isolation"""
|
|
|
|
def __init__(self, base_dir: str = "build-environments"):
|
|
self.base_dir = Path(base_dir)
|
|
self.base_dir.mkdir(exist_ok=True)
|
|
|
|
def create_isolated_environment(self, env_id: str, base_image: Optional[str] = None) -> str:
|
|
"""Create an isolated build environment"""
|
|
env_path = self.base_dir / env_id
|
|
env_path.mkdir(exist_ok=True)
|
|
|
|
# Create isolation structure
|
|
isolation_dirs = [
|
|
"rootfs",
|
|
"overlay",
|
|
"work",
|
|
"metadata"
|
|
]
|
|
|
|
for dir_name in isolation_dirs:
|
|
(env_path / dir_name).mkdir(exist_ok=True)
|
|
|
|
# Create basic isolation files
|
|
self._create_isolation_files(env_path)
|
|
|
|
# If base image provided, extract it
|
|
if base_image and os.path.exists(base_image):
|
|
self._extract_base_image(env_path, base_image)
|
|
|
|
return str(env_path)
|
|
|
|
def _create_isolation_files(self, env_path: Path):
|
|
"""Create basic isolation configuration files"""
|
|
# Create environment configuration
|
|
env_config = env_path / "metadata" / "environment.conf"
|
|
env_config.parent.mkdir(exist_ok=True)
|
|
|
|
with open(env_config, 'w') as f:
|
|
f.write(f"# Build environment configuration\n")
|
|
f.write(f"created_at: {datetime.now().isoformat()}\n")
|
|
f.write(f"isolation_level: strict\n")
|
|
f.write(f"base_path: {env_path}\n")
|
|
|
|
# Create mount points file
|
|
mount_points = env_path / "metadata" / "mounts"
|
|
with open(mount_points, 'w') as f:
|
|
f.write("# Mount points for isolation\n")
|
|
f.write("/proc\n")
|
|
f.write("/sys\n")
|
|
f.write("/dev\n")
|
|
f.write("/tmp\n")
|
|
|
|
def _extract_base_image(self, env_path: Path, base_image: str):
|
|
"""Extract base image to environment"""
|
|
try:
|
|
# For now, just copy the base image
|
|
# In a real implementation, this would extract and mount
|
|
shutil.copy2(base_image, env_path / "rootfs" / "base-image")
|
|
except Exception as e:
|
|
print(f"Warning: Failed to extract base image: {e}")
|
|
|
|
def cleanup_environment(self, env_path: str) -> bool:
|
|
"""Clean up an environment completely"""
|
|
try:
|
|
env_path_obj = Path(env_path)
|
|
if env_path_obj.exists():
|
|
shutil.rmtree(env_path_obj)
|
|
return True
|
|
return False
|
|
except Exception as e:
|
|
print(f"Error cleaning up environment {env_path}: {e}")
|
|
return False
|
|
|
|
def verify_isolation(self, env_path: str) -> bool:
|
|
"""Verify that environment isolation is working"""
|
|
env_path_obj = Path(env_path)
|
|
|
|
# Check isolation structure
|
|
required_dirs = ["rootfs", "overlay", "work", "metadata"]
|
|
for dir_name in required_dirs:
|
|
if not (env_path_obj / dir_name).exists():
|
|
return False
|
|
|
|
# Check isolation files
|
|
required_files = ["metadata/environment.conf", "metadata/mounts"]
|
|
for file_name in required_files:
|
|
if not (env_path_obj / file_name).exists():
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
class EnvironmentCleanup:
|
|
"""Handles automatic environment cleanup"""
|
|
|
|
def __init__(self, cleanup_policy: Dict[str, Any] = None):
|
|
self.cleanup_policy = cleanup_policy or {
|
|
"max_age_days": 7,
|
|
"max_use_count": 10,
|
|
"cleanup_interval_hours": 24,
|
|
"keep_minimum": 2
|
|
}
|
|
self.cleanup_thread = None
|
|
self.running = False
|
|
|
|
def start_cleanup_scheduler(self):
|
|
"""Start the cleanup scheduler thread"""
|
|
if self.cleanup_thread is None or not self.cleanup_thread.is_alive():
|
|
self.running = True
|
|
self.cleanup_thread = threading.Thread(target=self._cleanup_loop, daemon=True)
|
|
self.cleanup_thread.start()
|
|
|
|
def stop_cleanup_scheduler(self):
|
|
"""Stop the cleanup scheduler"""
|
|
self.running = False
|
|
if self.cleanup_thread and self.cleanup_thread.is_alive():
|
|
self.cleanup_thread.join()
|
|
|
|
def _cleanup_loop(self):
|
|
"""Main cleanup loop"""
|
|
while self.running:
|
|
try:
|
|
self._perform_cleanup()
|
|
# Sleep for cleanup interval
|
|
time.sleep(self.cleanup_policy["cleanup_interval_hours"] * 3600)
|
|
except Exception as e:
|
|
print(f"Cleanup error: {e}")
|
|
time.sleep(3600) # Sleep for 1 hour on error
|
|
|
|
def _perform_cleanup(self):
|
|
"""Perform actual cleanup operations"""
|
|
# This would be implemented to work with the environment manager
|
|
# For now, it's a placeholder
|
|
pass
|
|
|
|
def should_cleanup_environment(self, env: BuildEnvironment) -> bool:
|
|
"""Determine if an environment should be cleaned up"""
|
|
now = datetime.now()
|
|
|
|
# Check age
|
|
age_days = (now - env.created_at).days
|
|
if age_days > self.cleanup_policy["max_age_days"]:
|
|
return True
|
|
|
|
# Check use count
|
|
if env.use_count > self.cleanup_policy["max_use_count"]:
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
class HostMonitoring:
|
|
"""Monitors build host health and resource usage"""
|
|
|
|
def __init__(self):
|
|
self.health_checks = []
|
|
self.monitoring_interval = 30 # seconds
|
|
|
|
def add_health_check(self, check_func):
|
|
"""Add a health check function"""
|
|
self.health_checks.append(check_func)
|
|
|
|
def get_host_health(self) -> Dict[str, Any]:
|
|
"""Get current host health status"""
|
|
health_status = {
|
|
"timestamp": datetime.now().isoformat(),
|
|
"overall_status": "healthy",
|
|
"checks": {},
|
|
"resource_usage": self._get_resource_usage()
|
|
}
|
|
|
|
# Run health checks
|
|
for check_func in self.health_checks:
|
|
try:
|
|
check_name = check_func.__name__
|
|
check_result = check_func()
|
|
health_status["checks"][check_name] = check_result
|
|
|
|
if not check_result.get("healthy", True):
|
|
health_status["overall_status"] = "unhealthy"
|
|
except Exception as e:
|
|
health_status["checks"][check_func.__name__] = {
|
|
"healthy": False,
|
|
"error": str(e)
|
|
}
|
|
health_status["overall_status"] = "unhealthy"
|
|
|
|
return health_status
|
|
|
|
def _get_resource_usage(self) -> Dict[str, Any]:
|
|
"""Get current resource usage"""
|
|
try:
|
|
import psutil
|
|
|
|
cpu_percent = psutil.cpu_percent(interval=1)
|
|
memory = psutil.virtual_memory()
|
|
disk = psutil.disk_usage('/')
|
|
|
|
return {
|
|
"cpu_percent": cpu_percent,
|
|
"memory_percent": memory.percent,
|
|
"memory_available_gb": memory.available / (1024**3),
|
|
"disk_percent": disk.percent,
|
|
"disk_free_gb": disk.free / (1024**3)
|
|
}
|
|
except ImportError:
|
|
return {"error": "psutil not available"}
|
|
|
|
def check_disk_space(self) -> Dict[str, Any]:
|
|
"""Check available disk space"""
|
|
try:
|
|
import psutil
|
|
disk = psutil.disk_usage('/')
|
|
free_gb = disk.free / (1024**3)
|
|
|
|
return {
|
|
"healthy": free_gb > 5.0, # Need at least 5GB free
|
|
"free_gb": free_gb,
|
|
"threshold_gb": 5.0
|
|
}
|
|
except ImportError:
|
|
return {"healthy": False, "error": "psutil not available"}
|
|
|
|
def check_memory_usage(self) -> Dict[str, Any]:
|
|
"""Check memory usage"""
|
|
try:
|
|
import psutil
|
|
memory = psutil.virtual_memory()
|
|
available_gb = memory.available / (1024**3)
|
|
|
|
return {
|
|
"healthy": available_gb > 2.0, # Need at least 2GB free
|
|
"available_gb": available_gb,
|
|
"threshold_gb": 2.0
|
|
}
|
|
except ImportError:
|
|
return {"healthy": False, "error": "psutil not available"}
|
|
|
|
def check_cpu_usage(self) -> Dict[str, Any]:
|
|
"""Check CPU usage"""
|
|
try:
|
|
import psutil
|
|
cpu_percent = psutil.cpu_percent(interval=1)
|
|
|
|
return {
|
|
"healthy": cpu_percent < 90.0, # CPU should be less than 90%
|
|
"cpu_percent": cpu_percent,
|
|
"threshold_percent": 90.0
|
|
}
|
|
except ImportError:
|
|
return {"healthy": False, "error": "psutil not available"}
|
|
|
|
|
|
class BuildEnvironmentManager:
|
|
"""Main build environment management system"""
|
|
|
|
def __init__(self, base_dir: str = "build-environments"):
|
|
self.isolation = EnvironmentIsolation(base_dir)
|
|
self.cleanup = EnvironmentCleanup()
|
|
self.monitoring = HostMonitoring()
|
|
self.environments: Dict[str, BuildEnvironment] = {}
|
|
self.lock = threading.Lock()
|
|
|
|
# Add default health checks
|
|
self.monitoring.add_health_check(self.monitoring.check_disk_space)
|
|
self.monitoring.add_health_check(self.monitoring.check_memory_usage)
|
|
self.monitoring.add_health_check(self.monitoring.check_cpu_usage)
|
|
|
|
# Start cleanup scheduler
|
|
self.cleanup.start_cleanup_scheduler()
|
|
|
|
def create_environment(self, env_id: str, base_image: Optional[str] = None) -> str:
|
|
"""Create a new build environment"""
|
|
with self.lock:
|
|
if env_id in self.environments:
|
|
raise ValueError(f"Environment {env_id} already exists")
|
|
|
|
# Create environment
|
|
env_path = self.isolation.create_isolated_environment(env_id, base_image)
|
|
|
|
# Create environment record
|
|
env = BuildEnvironment(
|
|
id=env_id,
|
|
base_path=env_path,
|
|
status=EnvironmentStatus.CREATING,
|
|
created_at=datetime.now()
|
|
)
|
|
|
|
self.environments[env_id] = env
|
|
|
|
# Verify isolation
|
|
if self.isolation.verify_isolation(env_path):
|
|
env.status = EnvironmentStatus.READY
|
|
print(f"✅ Environment {env_id} created successfully")
|
|
else:
|
|
env.status = EnvironmentStatus.FAILED
|
|
print(f"❌ Environment {env_id} isolation verification failed")
|
|
|
|
return env_path
|
|
|
|
def get_environment(self, env_id: str) -> Optional[BuildEnvironment]:
|
|
"""Get an environment by ID"""
|
|
return self.environments.get(env_id)
|
|
|
|
def use_environment(self, env_id: str) -> bool:
|
|
"""Mark environment as in use"""
|
|
with self.lock:
|
|
env = self.environments.get(env_id)
|
|
if env and env.status == EnvironmentStatus.READY:
|
|
env.status = EnvironmentStatus.IN_USE
|
|
env.last_used = datetime.now()
|
|
env.use_count += 1
|
|
return True
|
|
return False
|
|
|
|
def release_environment(self, env_id: str) -> bool:
|
|
"""Release environment back to ready state"""
|
|
with self.lock:
|
|
env = self.environments.get(env_id)
|
|
if env and env.status == EnvironmentStatus.IN_USE:
|
|
env.status = EnvironmentStatus.READY
|
|
return True
|
|
return False
|
|
|
|
def cleanup_environment(self, env_id: str) -> bool:
|
|
"""Clean up a specific environment"""
|
|
with self.lock:
|
|
env = self.environments.get(env_id)
|
|
if env:
|
|
env.status = EnvironmentStatus.CLEANING
|
|
|
|
# Clean up files
|
|
if self.isolation.cleanup_environment(env.base_path):
|
|
env.status = EnvironmentStatus.CLEANED
|
|
del self.environments[env_id]
|
|
print(f"✅ Environment {env_id} cleaned up successfully")
|
|
return True
|
|
else:
|
|
env.status = EnvironmentStatus.FAILED
|
|
print(f"❌ Failed to clean up environment {env_id}")
|
|
return False
|
|
return False
|
|
|
|
def get_available_environments(self) -> List[BuildEnvironment]:
|
|
"""Get list of available environments"""
|
|
with self.lock:
|
|
return [env for env in self.environments.values()
|
|
if env.status == EnvironmentStatus.READY]
|
|
|
|
def get_host_health(self) -> Dict[str, Any]:
|
|
"""Get current host health status"""
|
|
return self.monitoring.get_host_health()
|
|
|
|
def cleanup_old_environments(self) -> int:
|
|
"""Clean up old environments based on policy"""
|
|
cleaned_count = 0
|
|
|
|
with self.lock:
|
|
envs_to_cleanup = []
|
|
for env in self.environments.values():
|
|
if self.cleanup.should_cleanup_environment(env):
|
|
envs_to_cleanup.append(env.id)
|
|
|
|
for env_id in envs_to_cleanup:
|
|
if self.cleanup_environment(env_id):
|
|
cleaned_count += 1
|
|
|
|
return cleaned_count
|
|
|
|
def shutdown(self):
|
|
"""Shutdown the environment manager"""
|
|
self.cleanup.stop_cleanup_scheduler()
|
|
|
|
# Clean up all environments
|
|
with self.lock:
|
|
for env_id in list(self.environments.keys()):
|
|
self.cleanup_environment(env_id)
|
|
|
|
|
|
def main():
|
|
"""Example usage of the build environment manager"""
|
|
print("Debian Forge Build Environment Manager")
|
|
print("=" * 50)
|
|
|
|
# Create environment manager
|
|
manager = BuildEnvironmentManager()
|
|
|
|
try:
|
|
# Create test environment
|
|
print("Creating test environment...")
|
|
env_path = manager.create_environment("test-env-001")
|
|
|
|
# Check host health
|
|
print("\nChecking host health...")
|
|
health = manager.get_host_health()
|
|
print(f"Overall status: {health['overall_status']}")
|
|
print(f"Resource usage: {health['resource_usage']}")
|
|
|
|
# Use environment
|
|
print("\nUsing environment...")
|
|
if manager.use_environment("test-env-001"):
|
|
print("✅ Environment marked as in use")
|
|
|
|
# Release environment
|
|
if manager.release_environment("test-env-001"):
|
|
print("✅ Environment released")
|
|
|
|
# Get available environments
|
|
available = manager.get_available_environments()
|
|
print(f"\nAvailable environments: {len(available)}")
|
|
|
|
# Clean up test environment
|
|
print("\nCleaning up test environment...")
|
|
if manager.cleanup_environment("test-env-001"):
|
|
print("✅ Test environment cleaned up")
|
|
|
|
finally:
|
|
manager.shutdown()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|