particle-os-modules/debian_repository_manager.py
2025-08-26 10:13:20 -07:00

394 lines
14 KiB
Python

#!/usr/bin/env python3
"""
Debian Repository Manager for Debian Forge
This module provides Debian repository management for OSBuild Composer,
handling repository configuration, mirror management, and package sources.
"""
import json
import os
import subprocess
import tempfile
from typing import Dict, List, Optional, Any
from dataclasses import dataclass, asdict
from pathlib import Path
import urllib.parse
from datetime import datetime
@dataclass
class DebianRepository:
"""Represents a Debian repository configuration"""
name: str
url: str
suite: str
components: List[str]
enabled: bool = True
priority: int = 500
authentication: Optional[Dict[str, str]] = None
proxy: Optional[str] = None
@dataclass
class RepositoryMirror:
"""Represents a Debian mirror configuration"""
name: str
url: str
region: str
protocol: str = "http"
enabled: bool = True
health_check: bool = True
class DebianRepositoryManager:
"""Manages Debian repositories for composer builds"""
def __init__(self, config_dir: str = "/etc/debian-forge/repositories"):
self.config_dir = Path(config_dir)
self.config_dir.mkdir(parents=True, exist_ok=True)
self.repositories_file = self.config_dir / "repositories.json"
self.mirrors_file = self.config_dir / "mirrors.json"
self._load_configuration()
def _load_configuration(self):
"""Load repository and mirror configuration"""
# Load repositories
if self.repositories_file.exists():
with open(self.repositories_file, 'r') as f:
self.repositories = json.load(f)
else:
self.repositories = self._get_default_repositories()
self._save_repositories()
# Load mirrors
if self.mirrors_file.exists():
with open(self.mirrors_file, 'r') as f:
self.mirrors = json.load(f)
else:
self.mirrors = self._get_default_mirrors()
self._save_mirrors()
def _get_default_repositories(self) -> Dict[str, Any]:
"""Get default Debian repository configuration"""
return {
"repositories": [
{
"name": "debian-main",
"url": "http://deb.debian.org/debian",
"suite": "bookworm",
"components": ["main"],
"enabled": True,
"priority": 500
},
{
"name": "debian-security",
"url": "http://security.debian.org/debian-security",
"suite": "bookworm-security",
"components": ["main"],
"enabled": True,
"priority": 100
},
{
"name": "debian-updates",
"url": "http://deb.debian.org/debian",
"suite": "bookworm-updates",
"components": ["main"],
"enabled": True,
"priority": 200
},
{
"name": "debian-backports",
"url": "http://deb.debian.org/debian",
"suite": "bookworm-backports",
"components": ["main", "contrib", "non-free-firmware"],
"enabled": False,
"priority": 300
}
]
}
def _get_default_mirrors(self) -> Dict[str, Any]:
"""Get default Debian mirror configuration"""
return {
"mirrors": [
{
"name": "debian-official",
"url": "http://deb.debian.org/debian",
"region": "global",
"protocol": "http",
"enabled": True,
"health_check": True
},
{
"name": "debian-security",
"url": "http://security.debian.org/debian-security",
"region": "global",
"protocol": "http",
"enabled": True,
"health_check": True
}
]
}
def _save_repositories(self):
"""Save repository configuration to file"""
with open(self.repositories_file, 'w') as f:
json.dump(self.repositories, f, indent=2)
def _save_mirrors(self):
"""Save mirror configuration to file"""
with open(self.mirrors_file, 'w') as f:
json.dump(self.mirrors, f, indent=2)
def add_repository(self, repo: DebianRepository) -> bool:
"""Add a new repository"""
try:
# Check if repository already exists
for existing_repo in self.repositories["repositories"]:
if existing_repo["name"] == repo.name:
print(f"Repository {repo.name} already exists")
return False
# Add new repository
self.repositories["repositories"].append(asdict(repo))
self._save_repositories()
return True
except Exception as e:
print(f"Failed to add repository: {e}")
return False
def remove_repository(self, name: str) -> bool:
"""Remove a repository by name"""
try:
self.repositories["repositories"] = [
repo for repo in self.repositories["repositories"]
if repo["name"] != name
]
self._save_repositories()
return True
except Exception as e:
print(f"Failed to remove repository: {e}")
return False
def update_repository(self, name: str, **kwargs) -> bool:
"""Update repository configuration"""
try:
for repo in self.repositories["repositories"]:
if repo["name"] == name:
for key, value in kwargs.items():
if key in repo:
repo[key] = value
self._save_repositories()
return True
print(f"Repository {name} not found")
return False
except Exception as e:
print(f"Failed to update repository: {e}")
return False
def get_repository(self, name: str) -> Optional[Dict[str, Any]]:
"""Get repository configuration by name"""
for repo in self.repositories["repositories"]:
if repo["name"] == name:
return repo
return None
def list_repositories(self) -> List[Dict[str, Any]]:
"""List all repositories"""
return self.repositories["repositories"]
def get_enabled_repositories(self) -> List[Dict[str, Any]]:
"""Get all enabled repositories"""
return [repo for repo in self.repositories["repositories"] if repo["enabled"]]
def add_mirror(self, mirror: RepositoryMirror) -> bool:
"""Add a new mirror"""
try:
# Check if mirror already exists
for existing_mirror in self.mirrors["mirrors"]:
if existing_mirror["name"] == mirror.name:
print(f"Mirror {mirror.name} already exists")
return False
# Add new mirror
self.mirrors["mirrors"].append(asdict(mirror))
self._save_mirrors()
return True
except Exception as e:
print(f"Failed to add mirror: {e}")
return False
def remove_mirror(self, name: str) -> bool:
"""Remove a mirror by name"""
try:
self.mirrors["mirrors"] = [
mirror for mirror in self.mirrors["mirrors"]
if mirror["name"] != name
]
self._save_mirrors()
return True
except Exception as e:
print(f"Failed to remove mirror: {e}")
return False
def list_mirrors(self) -> List[Dict[str, Any]]:
"""List all mirrors"""
return self.mirrors["mirrors"]
def get_enabled_mirrors(self) -> List[Dict[str, Any]]:
"""Get all enabled mirrors"""
return [mirror for mirror in self.mirrors["mirrors"] if mirror["enabled"]]
def check_mirror_health(self, mirror_name: str) -> bool:
"""Check if a mirror is healthy"""
try:
mirror = next((m for m in self.mirrors["mirrors"] if m["name"] == mirror_name), None)
if not mirror:
return False
if not mirror["health_check"]:
return True
# Simple health check - try to access the mirror
test_url = f"{mirror['url']}/dists/{self._get_default_suite()}/Release"
import urllib.request
try:
with urllib.request.urlopen(test_url, timeout=10) as response:
return response.status == 200
except:
return False
except Exception as e:
print(f"Health check failed for {mirror_name}: {e}")
return False
def _get_default_suite(self) -> str:
"""Get default Debian suite"""
return "bookworm"
def generate_sources_list(self, suite: str, components: Optional[List[str]] = None) -> str:
"""Generate sources.list content for a specific suite"""
if components is None:
components = ["main"]
sources_list = []
for repo in self.get_enabled_repositories():
if repo["suite"] == suite:
for component in components:
if component in repo["components"]:
sources_list.append(
f"deb {repo['url']} {repo['suite']} {component}"
)
return "\n".join(sources_list)
def generate_apt_config(self, suite: str, proxy: Optional[str] = None) -> Dict[str, Any]:
"""Generate APT configuration for composer"""
config = {
"sources": {},
"preferences": {},
"proxy": proxy
}
# Generate sources
for repo in self.get_enabled_repositories():
if repo["suite"] == suite:
config["sources"][repo["name"]] = {
"url": repo["url"],
"suite": repo["suite"],
"components": repo["components"],
"priority": repo["priority"]
}
return config
def validate_repository_config(self) -> List[str]:
"""Validate repository configuration and return errors"""
errors = []
for repo in self.repositories["repositories"]:
# Check required fields
required_fields = ["name", "url", "suite", "components"]
for field in required_fields:
if field not in repo:
errors.append(f"Repository {repo.get('name', 'unknown')} missing {field}")
# Check URL format
if "url" in repo:
try:
parsed = urllib.parse.urlparse(repo["url"])
if not parsed.scheme or not parsed.netloc:
errors.append(f"Repository {repo.get('name', 'unknown')} has invalid URL: {repo['url']}")
except:
errors.append(f"Repository {repo.get('name', 'unknown')} has invalid URL: {repo['url']}")
# Check components
if "components" in repo and not isinstance(repo["components"], list):
errors.append(f"Repository {repo.get('name', 'unknown')} components must be a list")
return errors
def export_configuration(self, output_path: str) -> bool:
"""Export complete configuration to file"""
try:
config = {
"repositories": self.repositories,
"mirrors": self.mirrors,
"exported_at": str(datetime.now()),
"version": "1.0"
}
with open(output_path, 'w') as f:
json.dump(config, f, indent=2)
return True
except Exception as e:
print(f"Failed to export configuration: {e}")
return False
def import_configuration(self, config_path: str) -> bool:
"""Import configuration from file"""
try:
with open(config_path, 'r') as f:
config = json.load(f)
if "repositories" in config:
self.repositories = config["repositories"]
self._save_repositories()
if "mirrors" in config:
self.mirrors = config["mirrors"]
self._save_mirrors()
return True
except Exception as e:
print(f"Failed to import configuration: {e}")
return False
def main():
"""Example usage of Debian repository manager"""
print("Debian Repository Manager Example")
# Create manager
manager = DebianRepositoryManager()
# List repositories
print("\nCurrent repositories:")
for repo in manager.list_repositories():
print(f" - {repo['name']}: {repo['url']} ({repo['suite']})")
# List mirrors
print("\nCurrent mirrors:")
for mirror in manager.list_mirrors():
print(f" - {mirror['name']}: {mirror['url']}")
if __name__ == '__main__':
main()