#!/usr/bin/env python3 """ Debian Repository Manager for Debian Forge This module provides Debian repository management for OSBuild Composer, handling repository configuration, mirror management, and package sources. """ import json import os import subprocess import tempfile from typing import Dict, List, Optional, Any from dataclasses import dataclass, asdict from pathlib import Path import urllib.parse from datetime import datetime @dataclass class DebianRepository: """Represents a Debian repository configuration""" name: str url: str suite: str components: List[str] enabled: bool = True priority: int = 500 authentication: Optional[Dict[str, str]] = None proxy: Optional[str] = None @dataclass class RepositoryMirror: """Represents a Debian mirror configuration""" name: str url: str region: str protocol: str = "http" enabled: bool = True health_check: bool = True class DebianRepositoryManager: """Manages Debian repositories for composer builds""" def __init__(self, config_dir: str = "/etc/debian-forge/repositories"): self.config_dir = Path(config_dir) self.config_dir.mkdir(parents=True, exist_ok=True) self.repositories_file = self.config_dir / "repositories.json" self.mirrors_file = self.config_dir / "mirrors.json" self._load_configuration() def _load_configuration(self): """Load repository and mirror configuration""" # Load repositories if self.repositories_file.exists(): with open(self.repositories_file, 'r') as f: self.repositories = json.load(f) else: self.repositories = self._get_default_repositories() self._save_repositories() # Load mirrors if self.mirrors_file.exists(): with open(self.mirrors_file, 'r') as f: self.mirrors = json.load(f) else: self.mirrors = self._get_default_mirrors() self._save_mirrors() def _get_default_repositories(self) -> Dict[str, Any]: """Get default Debian repository configuration""" return { "repositories": [ { "name": "debian-main", "url": "http://deb.debian.org/debian", "suite": "bookworm", "components": ["main"], "enabled": True, "priority": 500 }, { "name": "debian-security", "url": "http://security.debian.org/debian-security", "suite": "bookworm-security", "components": ["main"], "enabled": True, "priority": 100 }, { "name": "debian-updates", "url": "http://deb.debian.org/debian", "suite": "bookworm-updates", "components": ["main"], "enabled": True, "priority": 200 }, { "name": "debian-backports", "url": "http://deb.debian.org/debian", "suite": "bookworm-backports", "components": ["main", "contrib", "non-free-firmware"], "enabled": False, "priority": 300 } ] } def _get_default_mirrors(self) -> Dict[str, Any]: """Get default Debian mirror configuration""" return { "mirrors": [ { "name": "debian-official", "url": "http://deb.debian.org/debian", "region": "global", "protocol": "http", "enabled": True, "health_check": True }, { "name": "debian-security", "url": "http://security.debian.org/debian-security", "region": "global", "protocol": "http", "enabled": True, "health_check": True } ] } def _save_repositories(self): """Save repository configuration to file""" with open(self.repositories_file, 'w') as f: json.dump(self.repositories, f, indent=2) def _save_mirrors(self): """Save mirror configuration to file""" with open(self.mirrors_file, 'w') as f: json.dump(self.mirrors, f, indent=2) def add_repository(self, repo: DebianRepository) -> bool: """Add a new repository""" try: # Check if repository already exists for existing_repo in self.repositories["repositories"]: if existing_repo["name"] == repo.name: print(f"Repository {repo.name} already exists") return False # Add new repository self.repositories["repositories"].append(asdict(repo)) self._save_repositories() return True except Exception as e: print(f"Failed to add repository: {e}") return False def remove_repository(self, name: str) -> bool: """Remove a repository by name""" try: self.repositories["repositories"] = [ repo for repo in self.repositories["repositories"] if repo["name"] != name ] self._save_repositories() return True except Exception as e: print(f"Failed to remove repository: {e}") return False def update_repository(self, name: str, **kwargs) -> bool: """Update repository configuration""" try: for repo in self.repositories["repositories"]: if repo["name"] == name: for key, value in kwargs.items(): if key in repo: repo[key] = value self._save_repositories() return True print(f"Repository {name} not found") return False except Exception as e: print(f"Failed to update repository: {e}") return False def get_repository(self, name: str) -> Optional[Dict[str, Any]]: """Get repository configuration by name""" for repo in self.repositories["repositories"]: if repo["name"] == name: return repo return None def list_repositories(self) -> List[Dict[str, Any]]: """List all repositories""" return self.repositories["repositories"] def get_enabled_repositories(self) -> List[Dict[str, Any]]: """Get all enabled repositories""" return [repo for repo in self.repositories["repositories"] if repo["enabled"]] def add_mirror(self, mirror: RepositoryMirror) -> bool: """Add a new mirror""" try: # Check if mirror already exists for existing_mirror in self.mirrors["mirrors"]: if existing_mirror["name"] == mirror.name: print(f"Mirror {mirror.name} already exists") return False # Add new mirror self.mirrors["mirrors"].append(asdict(mirror)) self._save_mirrors() return True except Exception as e: print(f"Failed to add mirror: {e}") return False def remove_mirror(self, name: str) -> bool: """Remove a mirror by name""" try: self.mirrors["mirrors"] = [ mirror for mirror in self.mirrors["mirrors"] if mirror["name"] != name ] self._save_mirrors() return True except Exception as e: print(f"Failed to remove mirror: {e}") return False def list_mirrors(self) -> List[Dict[str, Any]]: """List all mirrors""" return self.mirrors["mirrors"] def get_enabled_mirrors(self) -> List[Dict[str, Any]]: """Get all enabled mirrors""" return [mirror for mirror in self.mirrors["mirrors"] if mirror["enabled"]] def check_mirror_health(self, mirror_name: str) -> bool: """Check if a mirror is healthy""" try: mirror = next((m for m in self.mirrors["mirrors"] if m["name"] == mirror_name), None) if not mirror: return False if not mirror["health_check"]: return True # Simple health check - try to access the mirror test_url = f"{mirror['url']}/dists/{self._get_default_suite()}/Release" import urllib.request try: with urllib.request.urlopen(test_url, timeout=10) as response: return response.status == 200 except: return False except Exception as e: print(f"Health check failed for {mirror_name}: {e}") return False def _get_default_suite(self) -> str: """Get default Debian suite""" return "bookworm" def generate_sources_list(self, suite: str, components: Optional[List[str]] = None) -> str: """Generate sources.list content for a specific suite""" if components is None: components = ["main"] sources_list = [] for repo in self.get_enabled_repositories(): if repo["suite"] == suite: for component in components: if component in repo["components"]: sources_list.append( f"deb {repo['url']} {repo['suite']} {component}" ) return "\n".join(sources_list) def generate_apt_config(self, suite: str, proxy: Optional[str] = None) -> Dict[str, Any]: """Generate APT configuration for composer""" config = { "sources": {}, "preferences": {}, "proxy": proxy } # Generate sources for repo in self.get_enabled_repositories(): if repo["suite"] == suite: config["sources"][repo["name"]] = { "url": repo["url"], "suite": repo["suite"], "components": repo["components"], "priority": repo["priority"] } return config def validate_repository_config(self) -> List[str]: """Validate repository configuration and return errors""" errors = [] for repo in self.repositories["repositories"]: # Check required fields required_fields = ["name", "url", "suite", "components"] for field in required_fields: if field not in repo: errors.append(f"Repository {repo.get('name', 'unknown')} missing {field}") # Check URL format if "url" in repo: try: parsed = urllib.parse.urlparse(repo["url"]) if not parsed.scheme or not parsed.netloc: errors.append(f"Repository {repo.get('name', 'unknown')} has invalid URL: {repo['url']}") except: errors.append(f"Repository {repo.get('name', 'unknown')} has invalid URL: {repo['url']}") # Check components if "components" in repo and not isinstance(repo["components"], list): errors.append(f"Repository {repo.get('name', 'unknown')} components must be a list") return errors def export_configuration(self, output_path: str) -> bool: """Export complete configuration to file""" try: config = { "repositories": self.repositories, "mirrors": self.mirrors, "exported_at": str(datetime.now()), "version": "1.0" } with open(output_path, 'w') as f: json.dump(config, f, indent=2) return True except Exception as e: print(f"Failed to export configuration: {e}") return False def import_configuration(self, config_path: str) -> bool: """Import configuration from file""" try: with open(config_path, 'r') as f: config = json.load(f) if "repositories" in config: self.repositories = config["repositories"] self._save_repositories() if "mirrors" in config: self.mirrors = config["mirrors"] self._save_mirrors() return True except Exception as e: print(f"Failed to import configuration: {e}") return False def main(): """Example usage of Debian repository manager""" print("Debian Repository Manager Example") # Create manager manager = DebianRepositoryManager() # List repositories print("\nCurrent repositories:") for repo in manager.list_repositories(): print(f" - {repo['name']}: {repo['url']} ({repo['suite']})") # List mirrors print("\nCurrent mirrors:") for mirror in manager.list_mirrors(): print(f" - {mirror['name']}: {mirror['url']}") if __name__ == '__main__': main()