#!/usr/bin/env python3 """ Debian Package Metadata Synchronization This module handles synchronization of package metadata from Debian repositories, including package lists, dependency information, and version tracking. """ import json import os import subprocess import tempfile import gzip import hashlib from typing import Dict, List, Optional, Any, Set from dataclasses import dataclass, asdict from pathlib import Path import urllib.request import urllib.parse from datetime import datetime, timedelta import sqlite3 @dataclass class PackageMetadata: """Represents package metadata from Debian repositories""" name: str version: str architecture: str suite: str component: str depends: List[str] recommends: List[str] suggests: List[str] conflicts: List[str] breaks: List[str] replaces: List[str] provides: List[str] essential: bool priority: str size: int md5sum: str sha256: str description: str last_updated: datetime class DebianPackageMetadataSync: """Synchronizes package metadata from Debian repositories""" def __init__(self, cache_dir: str = "./cache/metadata"): self.cache_dir = Path(cache_dir) self.cache_dir.mkdir(parents=True, exist_ok=True) self.metadata_db = self.cache_dir / "packages.db" self.last_sync_file = self.cache_dir / "last_sync.json" self._init_database() def _init_database(self): """Initialize SQLite database for package metadata""" conn = sqlite3.connect(self.metadata_db) cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS packages ( name TEXT, version TEXT, architecture TEXT, suite TEXT, component TEXT, depends TEXT, recommends TEXT, suggests TEXT, conflicts TEXT, breaks TEXT, replaces TEXT, provides TEXT, essential BOOLEAN, priority TEXT, size INTEGER, md5sum TEXT, sha256 TEXT, description TEXT, last_updated TIMESTAMP, PRIMARY KEY (name, version, architecture, suite) ) ''') cursor.execute(''' CREATE INDEX IF NOT EXISTS idx_package_name ON packages(name) ''') cursor.execute(''' CREATE INDEX IF NOT EXISTS idx_suite_arch ON packages(suite, architecture) ''') conn.commit() conn.close() def sync_repository_metadata(self, repository_url: str, suite: str, components: List[str], architectures: List[str]) -> bool: """Sync package metadata from a Debian repository""" try: print(f"Syncing metadata from {repository_url} for suite {suite}") for component in components: for arch in architectures: success = self._sync_component_metadata( repository_url, suite, component, arch ) if not success: print(f"Failed to sync {component}/{arch}") return False self._update_last_sync(repository_url, suite) return True except Exception as e: print(f"Metadata sync failed: {e}") return False def _sync_component_metadata(self, repository_url: str, suite: str, component: str, architecture: str) -> bool: """Sync metadata for a specific component and architecture""" try: # Download Packages.gz file packages_url = f"{repository_url}/dists/{suite}/{component}/binary-{architecture}/Packages.gz" packages_file = self.cache_dir / f"Packages_{suite}_{component}_{architecture}.gz" # Download if newer than local copy if not self._download_if_newer(packages_url, packages_file): return False # Parse and store metadata packages_data = self._parse_packages_file(packages_file) self._store_packages_metadata(packages_data, suite, component, architecture) return True except Exception as e: print(f"Component sync failed for {component}/{architecture}: {e}") return False def _download_if_newer(self, url: str, local_file: Path) -> bool: """Download file if it's newer than local copy""" try: # Check if we need to download if local_file.exists(): local_time = local_file.stat().st_mtime remote_time = self._get_remote_file_time(url) if remote_time <= local_time: print(f"Local file is up to date: {local_file.name}") return True # Download the file print(f"Downloading {url}") urllib.request.urlretrieve(url, local_file) return True except Exception as e: print(f"Download failed: {e}") return False def _get_remote_file_time(self, url: str) -> float: """Get last modified time of remote file""" try: req = urllib.request.Request(url, method='HEAD') with urllib.request.urlopen(req) as response: last_modified = response.headers.get('Last-Modified') if last_modified: dt = datetime.strptime(last_modified, '%a, %d %b %Y %H:%M:%S %Z') return dt.timestamp() return 0 except Exception: return 0 def _parse_packages_file(self, packages_file: Path) -> List[Dict[str, Any]]: """Parse Debian Packages.gz file""" packages = [] current_package = {} try: with gzip.open(packages_file, 'rt', encoding='utf-8') as f: for line in f: line = line.strip() if not line: if current_package: packages.append(current_package.copy()) current_package = {} continue if ':' in line: key, value = line.split(':', 1) key = key.strip() value = value.strip() if key in ['Depends', 'Recommends', 'Suggests', 'Conflicts', 'Breaks', 'Replaces', 'Provides']: current_package[key.lower()] = [dep.strip() for dep in value.split(',') if dep.strip()] elif key == 'Essential': current_package['essential'] = value == 'yes' elif key == 'Size': current_package['size'] = int(value) else: current_package[key.lower()] = value # Add last package if current_package: packages.append(current_package) return packages except Exception as e: print(f"Failed to parse packages file: {e}") return [] def _store_packages_metadata(self, packages: List[Dict[str, Any]], suite: str, component: str, architecture: str): """Store package metadata in database""" conn = sqlite3.connect(self.metadata_db) cursor = conn.cursor() try: for package in packages: # Prepare data for insertion package_data = { 'name': package.get('package', ''), 'version': package.get('version', ''), 'architecture': architecture, 'suite': suite, 'component': component, 'depends': json.dumps(package.get('depends', [])), 'recommends': json.dumps(package.get('recommends', [])), 'suggests': json.dumps(package.get('suggests', [])), 'conflicts': json.dumps(package.get('conflicts', [])), 'breaks': json.dumps(package.get('breaks', [])), 'replaces': json.dumps(package.get('replaces', [])), 'provides': json.dumps(package.get('provides', [])), 'essential': package.get('essential', False), 'priority': package.get('priority', 'optional'), 'size': package.get('size', 0), 'md5sum': package.get('md5sum', ''), 'sha256': package.get('sha256', ''), 'description': package.get('description', ''), 'last_updated': datetime.now().isoformat() } # Insert or update cursor.execute(''' INSERT OR REPLACE INTO packages (name, version, architecture, suite, component, depends, recommends, suggests, conflicts, breaks, replaces, provides, essential, priority, size, md5sum, sha256, description, last_updated) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', tuple(package_data.values())) conn.commit() print(f"Stored metadata for {len(packages)} packages") except Exception as e: print(f"Failed to store metadata: {e}") conn.rollback() finally: conn.close() def _update_last_sync(self, repository_url: str, suite: str): """Update last sync timestamp""" sync_info = { 'repository': repository_url, 'suite': suite, 'last_sync': datetime.now().isoformat() } with open(self.last_sync_file, 'w') as f: json.dump(sync_info, f, indent=2) def get_package_metadata(self, package_name: str, suite: str = None, architecture: str = None) -> List[PackageMetadata]: """Get package metadata from database""" conn = sqlite3.connect(self.metadata_db) cursor = conn.cursor() try: query = "SELECT * FROM packages WHERE name = ?" params = [package_name] if suite: query += " AND suite = ?" params.append(suite) if architecture: query += " AND architecture = ?" params.append(architecture) cursor.execute(query, params) rows = cursor.fetchall() packages = [] for row in cursor.fetchall(): package = PackageMetadata( name=row[0], version=row[1], architecture=row[2], suite=row[3], component=row[4], depends=json.loads(row[5]), recommends=json.loads(row[6]), suggests=json.loads(row[7]), conflicts=json.loads(row[8]), breaks=json.loads(row[9]), replaces=row[10], provides=json.loads(row[11]), essential=row[12], priority=row[13], size=row[14], md5sum=row[15], sha256=row[16], description=row[17], last_updated=datetime.fromisoformat(row[18]) ) packages.append(package) return packages except Exception as e: print(f"Failed to get package metadata: {e}") return [] finally: conn.close() def get_sync_status(self) -> Dict[str, Any]: """Get synchronization status""" if not self.last_sync_file.exists(): return {'status': 'never_synced'} with open(self.last_sync_file, 'r') as f: sync_info = json.load(f) return { 'status': 'synced', 'last_sync': sync_info['last_sync'], 'repository': sync_info['repository'], 'suite': sync_info['suite'] } def main(): """Test metadata synchronization""" sync = DebianPackageMetadataSync() # Test sync with Debian main repository repositories = [ { 'url': 'http://deb.debian.org/debian', 'suite': 'bookworm', 'components': ['main'], 'architectures': ['amd64'] } ] for repo in repositories: success = sync.sync_repository_metadata( repo['url'], repo['suite'], repo['components'], repo['architectures'] ) if success: print(f"Successfully synced {repo['suite']}") else: print(f"Failed to sync {repo['suite']}") # Show sync status status = sync.get_sync_status() print(f"Sync status: {status}") if __name__ == "__main__": main()