394 lines
14 KiB
Python
394 lines
14 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Debian Repository Manager for Debian Forge
|
|
|
|
This module provides Debian repository management for OSBuild Composer,
|
|
handling repository configuration, mirror management, and package sources.
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import tempfile
|
|
from typing import Dict, List, Optional, Any
|
|
from dataclasses import dataclass, asdict
|
|
from pathlib import Path
|
|
import urllib.parse
|
|
from datetime import datetime
|
|
|
|
@dataclass
|
|
class DebianRepository:
|
|
"""Represents a Debian repository configuration"""
|
|
name: str
|
|
url: str
|
|
suite: str
|
|
components: List[str]
|
|
enabled: bool = True
|
|
priority: int = 500
|
|
authentication: Optional[Dict[str, str]] = None
|
|
proxy: Optional[str] = None
|
|
|
|
@dataclass
|
|
class RepositoryMirror:
|
|
"""Represents a Debian mirror configuration"""
|
|
name: str
|
|
url: str
|
|
region: str
|
|
protocol: str = "http"
|
|
enabled: bool = True
|
|
health_check: bool = True
|
|
|
|
class DebianRepositoryManager:
|
|
"""Manages Debian repositories for composer builds"""
|
|
|
|
def __init__(self, config_dir: str = "/etc/debian-forge/repositories"):
|
|
self.config_dir = Path(config_dir)
|
|
self.config_dir.mkdir(parents=True, exist_ok=True)
|
|
self.repositories_file = self.config_dir / "repositories.json"
|
|
self.mirrors_file = self.config_dir / "mirrors.json"
|
|
self._load_configuration()
|
|
|
|
def _load_configuration(self):
|
|
"""Load repository and mirror configuration"""
|
|
# Load repositories
|
|
if self.repositories_file.exists():
|
|
with open(self.repositories_file, 'r') as f:
|
|
self.repositories = json.load(f)
|
|
else:
|
|
self.repositories = self._get_default_repositories()
|
|
self._save_repositories()
|
|
|
|
# Load mirrors
|
|
if self.mirrors_file.exists():
|
|
with open(self.mirrors_file, 'r') as f:
|
|
self.mirrors = json.load(f)
|
|
else:
|
|
self.mirrors = self._get_default_mirrors()
|
|
self._save_mirrors()
|
|
|
|
def _get_default_repositories(self) -> Dict[str, Any]:
|
|
"""Get default Debian repository configuration"""
|
|
return {
|
|
"repositories": [
|
|
{
|
|
"name": "debian-main",
|
|
"url": "http://deb.debian.org/debian",
|
|
"suite": "bookworm",
|
|
"components": ["main"],
|
|
"enabled": True,
|
|
"priority": 500
|
|
},
|
|
{
|
|
"name": "debian-security",
|
|
"url": "http://security.debian.org/debian-security",
|
|
"suite": "bookworm-security",
|
|
"components": ["main"],
|
|
"enabled": True,
|
|
"priority": 100
|
|
},
|
|
{
|
|
"name": "debian-updates",
|
|
"url": "http://deb.debian.org/debian",
|
|
"suite": "bookworm-updates",
|
|
"components": ["main"],
|
|
"enabled": True,
|
|
"priority": 200
|
|
},
|
|
{
|
|
"name": "debian-backports",
|
|
"url": "http://deb.debian.org/debian",
|
|
"suite": "bookworm-backports",
|
|
"components": ["main", "contrib", "non-free-firmware"],
|
|
"enabled": False,
|
|
"priority": 300
|
|
}
|
|
]
|
|
}
|
|
|
|
def _get_default_mirrors(self) -> Dict[str, Any]:
|
|
"""Get default Debian mirror configuration"""
|
|
return {
|
|
"mirrors": [
|
|
{
|
|
"name": "debian-official",
|
|
"url": "http://deb.debian.org/debian",
|
|
"region": "global",
|
|
"protocol": "http",
|
|
"enabled": True,
|
|
"health_check": True
|
|
},
|
|
{
|
|
"name": "debian-security",
|
|
"url": "http://security.debian.org/debian-security",
|
|
"region": "global",
|
|
"protocol": "http",
|
|
"enabled": True,
|
|
"health_check": True
|
|
}
|
|
]
|
|
}
|
|
|
|
def _save_repositories(self):
|
|
"""Save repository configuration to file"""
|
|
with open(self.repositories_file, 'w') as f:
|
|
json.dump(self.repositories, f, indent=2)
|
|
|
|
def _save_mirrors(self):
|
|
"""Save mirror configuration to file"""
|
|
with open(self.mirrors_file, 'w') as f:
|
|
json.dump(self.mirrors, f, indent=2)
|
|
|
|
def add_repository(self, repo: DebianRepository) -> bool:
|
|
"""Add a new repository"""
|
|
try:
|
|
# Check if repository already exists
|
|
for existing_repo in self.repositories["repositories"]:
|
|
if existing_repo["name"] == repo.name:
|
|
print(f"Repository {repo.name} already exists")
|
|
return False
|
|
|
|
# Add new repository
|
|
self.repositories["repositories"].append(asdict(repo))
|
|
self._save_repositories()
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"Failed to add repository: {e}")
|
|
return False
|
|
|
|
def remove_repository(self, name: str) -> bool:
|
|
"""Remove a repository by name"""
|
|
try:
|
|
self.repositories["repositories"] = [
|
|
repo for repo in self.repositories["repositories"]
|
|
if repo["name"] != name
|
|
]
|
|
self._save_repositories()
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"Failed to remove repository: {e}")
|
|
return False
|
|
|
|
def update_repository(self, name: str, **kwargs) -> bool:
|
|
"""Update repository configuration"""
|
|
try:
|
|
for repo in self.repositories["repositories"]:
|
|
if repo["name"] == name:
|
|
for key, value in kwargs.items():
|
|
if key in repo:
|
|
repo[key] = value
|
|
self._save_repositories()
|
|
return True
|
|
|
|
print(f"Repository {name} not found")
|
|
return False
|
|
|
|
except Exception as e:
|
|
print(f"Failed to update repository: {e}")
|
|
return False
|
|
|
|
def get_repository(self, name: str) -> Optional[Dict[str, Any]]:
|
|
"""Get repository configuration by name"""
|
|
for repo in self.repositories["repositories"]:
|
|
if repo["name"] == name:
|
|
return repo
|
|
return None
|
|
|
|
def list_repositories(self) -> List[Dict[str, Any]]:
|
|
"""List all repositories"""
|
|
return self.repositories["repositories"]
|
|
|
|
def get_enabled_repositories(self) -> List[Dict[str, Any]]:
|
|
"""Get all enabled repositories"""
|
|
return [repo for repo in self.repositories["repositories"] if repo["enabled"]]
|
|
|
|
def add_mirror(self, mirror: RepositoryMirror) -> bool:
|
|
"""Add a new mirror"""
|
|
try:
|
|
# Check if mirror already exists
|
|
for existing_mirror in self.mirrors["mirrors"]:
|
|
if existing_mirror["name"] == mirror.name:
|
|
print(f"Mirror {mirror.name} already exists")
|
|
return False
|
|
|
|
# Add new mirror
|
|
self.mirrors["mirrors"].append(asdict(mirror))
|
|
self._save_mirrors()
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"Failed to add mirror: {e}")
|
|
return False
|
|
|
|
def remove_mirror(self, name: str) -> bool:
|
|
"""Remove a mirror by name"""
|
|
try:
|
|
self.mirrors["mirrors"] = [
|
|
mirror for mirror in self.mirrors["mirrors"]
|
|
if mirror["name"] != name
|
|
]
|
|
self._save_mirrors()
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"Failed to remove mirror: {e}")
|
|
return False
|
|
|
|
def list_mirrors(self) -> List[Dict[str, Any]]:
|
|
"""List all mirrors"""
|
|
return self.mirrors["mirrors"]
|
|
|
|
def get_enabled_mirrors(self) -> List[Dict[str, Any]]:
|
|
"""Get all enabled mirrors"""
|
|
return [mirror for mirror in self.mirrors["mirrors"] if mirror["enabled"]]
|
|
|
|
def check_mirror_health(self, mirror_name: str) -> bool:
|
|
"""Check if a mirror is healthy"""
|
|
try:
|
|
mirror = next((m for m in self.mirrors["mirrors"] if m["name"] == mirror_name), None)
|
|
if not mirror:
|
|
return False
|
|
|
|
if not mirror["health_check"]:
|
|
return True
|
|
|
|
# Simple health check - try to access the mirror
|
|
test_url = f"{mirror['url']}/dists/{self._get_default_suite()}/Release"
|
|
|
|
import urllib.request
|
|
try:
|
|
with urllib.request.urlopen(test_url, timeout=10) as response:
|
|
return response.status == 200
|
|
except:
|
|
return False
|
|
|
|
except Exception as e:
|
|
print(f"Health check failed for {mirror_name}: {e}")
|
|
return False
|
|
|
|
def _get_default_suite(self) -> str:
|
|
"""Get default Debian suite"""
|
|
return "bookworm"
|
|
|
|
def generate_sources_list(self, suite: str, components: Optional[List[str]] = None) -> str:
|
|
"""Generate sources.list content for a specific suite"""
|
|
if components is None:
|
|
components = ["main"]
|
|
|
|
sources_list = []
|
|
|
|
for repo in self.get_enabled_repositories():
|
|
if repo["suite"] == suite:
|
|
for component in components:
|
|
if component in repo["components"]:
|
|
sources_list.append(
|
|
f"deb {repo['url']} {repo['suite']} {component}"
|
|
)
|
|
|
|
return "\n".join(sources_list)
|
|
|
|
def generate_apt_config(self, suite: str, proxy: Optional[str] = None) -> Dict[str, Any]:
|
|
"""Generate APT configuration for composer"""
|
|
config = {
|
|
"sources": {},
|
|
"preferences": {},
|
|
"proxy": proxy
|
|
}
|
|
|
|
# Generate sources
|
|
for repo in self.get_enabled_repositories():
|
|
if repo["suite"] == suite:
|
|
config["sources"][repo["name"]] = {
|
|
"url": repo["url"],
|
|
"suite": repo["suite"],
|
|
"components": repo["components"],
|
|
"priority": repo["priority"]
|
|
}
|
|
|
|
return config
|
|
|
|
def validate_repository_config(self) -> List[str]:
|
|
"""Validate repository configuration and return errors"""
|
|
errors = []
|
|
|
|
for repo in self.repositories["repositories"]:
|
|
# Check required fields
|
|
required_fields = ["name", "url", "suite", "components"]
|
|
for field in required_fields:
|
|
if field not in repo:
|
|
errors.append(f"Repository {repo.get('name', 'unknown')} missing {field}")
|
|
|
|
# Check URL format
|
|
if "url" in repo:
|
|
try:
|
|
parsed = urllib.parse.urlparse(repo["url"])
|
|
if not parsed.scheme or not parsed.netloc:
|
|
errors.append(f"Repository {repo.get('name', 'unknown')} has invalid URL: {repo['url']}")
|
|
except:
|
|
errors.append(f"Repository {repo.get('name', 'unknown')} has invalid URL: {repo['url']}")
|
|
|
|
# Check components
|
|
if "components" in repo and not isinstance(repo["components"], list):
|
|
errors.append(f"Repository {repo.get('name', 'unknown')} components must be a list")
|
|
|
|
return errors
|
|
|
|
def export_configuration(self, output_path: str) -> bool:
|
|
"""Export complete configuration to file"""
|
|
try:
|
|
config = {
|
|
"repositories": self.repositories,
|
|
"mirrors": self.mirrors,
|
|
"exported_at": str(datetime.now()),
|
|
"version": "1.0"
|
|
}
|
|
|
|
with open(output_path, 'w') as f:
|
|
json.dump(config, f, indent=2)
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"Failed to export configuration: {e}")
|
|
return False
|
|
|
|
def import_configuration(self, config_path: str) -> bool:
|
|
"""Import configuration from file"""
|
|
try:
|
|
with open(config_path, 'r') as f:
|
|
config = json.load(f)
|
|
|
|
if "repositories" in config:
|
|
self.repositories = config["repositories"]
|
|
self._save_repositories()
|
|
|
|
if "mirrors" in config:
|
|
self.mirrors = config["mirrors"]
|
|
self._save_mirrors()
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"Failed to import configuration: {e}")
|
|
return False
|
|
|
|
def main():
|
|
"""Example usage of Debian repository manager"""
|
|
print("Debian Repository Manager Example")
|
|
|
|
# Create manager
|
|
manager = DebianRepositoryManager()
|
|
|
|
# List repositories
|
|
print("\nCurrent repositories:")
|
|
for repo in manager.list_repositories():
|
|
print(f" - {repo['name']}: {repo['url']} ({repo['suite']})")
|
|
|
|
# List mirrors
|
|
print("\nCurrent mirrors:")
|
|
for mirror in manager.list_mirrors():
|
|
print(f" - {mirror['name']}: {mirror['url']}")
|
|
|
|
if __name__ == '__main__':
|
|
main()
|