particle-os-schema/debian_package_metadata_sync.py
2025-08-26 10:13:49 -07:00

375 lines
13 KiB
Python

#!/usr/bin/env python3
"""
Debian Package Metadata Synchronization
This module handles synchronization of package metadata from Debian repositories,
including package lists, dependency information, and version tracking.
"""
import json
import os
import subprocess
import tempfile
import gzip
import hashlib
from typing import Dict, List, Optional, Any, Set
from dataclasses import dataclass, asdict
from pathlib import Path
import urllib.request
import urllib.parse
from datetime import datetime, timedelta
import sqlite3
@dataclass
class PackageMetadata:
"""Represents package metadata from Debian repositories"""
name: str
version: str
architecture: str
suite: str
component: str
depends: List[str]
recommends: List[str]
suggests: List[str]
conflicts: List[str]
breaks: List[str]
replaces: List[str]
provides: List[str]
essential: bool
priority: str
size: int
md5sum: str
sha256: str
description: str
last_updated: datetime
class DebianPackageMetadataSync:
"""Synchronizes package metadata from Debian repositories"""
def __init__(self, cache_dir: str = "./cache/metadata"):
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(parents=True, exist_ok=True)
self.metadata_db = self.cache_dir / "packages.db"
self.last_sync_file = self.cache_dir / "last_sync.json"
self._init_database()
def _init_database(self):
"""Initialize SQLite database for package metadata"""
conn = sqlite3.connect(self.metadata_db)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS packages (
name TEXT,
version TEXT,
architecture TEXT,
suite TEXT,
component TEXT,
depends TEXT,
recommends TEXT,
suggests TEXT,
conflicts TEXT,
breaks TEXT,
replaces TEXT,
provides TEXT,
essential BOOLEAN,
priority TEXT,
size INTEGER,
md5sum TEXT,
sha256 TEXT,
description TEXT,
last_updated TIMESTAMP,
PRIMARY KEY (name, version, architecture, suite)
)
''')
cursor.execute('''
CREATE INDEX IF NOT EXISTS idx_package_name
ON packages(name)
''')
cursor.execute('''
CREATE INDEX IF NOT EXISTS idx_suite_arch
ON packages(suite, architecture)
''')
conn.commit()
conn.close()
def sync_repository_metadata(self, repository_url: str, suite: str,
components: List[str], architectures: List[str]) -> bool:
"""Sync package metadata from a Debian repository"""
try:
print(f"Syncing metadata from {repository_url} for suite {suite}")
for component in components:
for arch in architectures:
success = self._sync_component_metadata(
repository_url, suite, component, arch
)
if not success:
print(f"Failed to sync {component}/{arch}")
return False
self._update_last_sync(repository_url, suite)
return True
except Exception as e:
print(f"Metadata sync failed: {e}")
return False
def _sync_component_metadata(self, repository_url: str, suite: str,
component: str, architecture: str) -> bool:
"""Sync metadata for a specific component and architecture"""
try:
# Download Packages.gz file
packages_url = f"{repository_url}/dists/{suite}/{component}/binary-{architecture}/Packages.gz"
packages_file = self.cache_dir / f"Packages_{suite}_{component}_{architecture}.gz"
# Download if newer than local copy
if not self._download_if_newer(packages_url, packages_file):
return False
# Parse and store metadata
packages_data = self._parse_packages_file(packages_file)
self._store_packages_metadata(packages_data, suite, component, architecture)
return True
except Exception as e:
print(f"Component sync failed for {component}/{architecture}: {e}")
return False
def _download_if_newer(self, url: str, local_file: Path) -> bool:
"""Download file if it's newer than local copy"""
try:
# Check if we need to download
if local_file.exists():
local_time = local_file.stat().st_mtime
remote_time = self._get_remote_file_time(url)
if remote_time <= local_time:
print(f"Local file is up to date: {local_file.name}")
return True
# Download the file
print(f"Downloading {url}")
urllib.request.urlretrieve(url, local_file)
return True
except Exception as e:
print(f"Download failed: {e}")
return False
def _get_remote_file_time(self, url: str) -> float:
"""Get last modified time of remote file"""
try:
req = urllib.request.Request(url, method='HEAD')
with urllib.request.urlopen(req) as response:
last_modified = response.headers.get('Last-Modified')
if last_modified:
dt = datetime.strptime(last_modified, '%a, %d %b %Y %H:%M:%S %Z')
return dt.timestamp()
return 0
except Exception:
return 0
def _parse_packages_file(self, packages_file: Path) -> List[Dict[str, Any]]:
"""Parse Debian Packages.gz file"""
packages = []
current_package = {}
try:
with gzip.open(packages_file, 'rt', encoding='utf-8') as f:
for line in f:
line = line.strip()
if not line:
if current_package:
packages.append(current_package.copy())
current_package = {}
continue
if ':' in line:
key, value = line.split(':', 1)
key = key.strip()
value = value.strip()
if key in ['Depends', 'Recommends', 'Suggests', 'Conflicts', 'Breaks', 'Replaces', 'Provides']:
current_package[key.lower()] = [dep.strip() for dep in value.split(',') if dep.strip()]
elif key == 'Essential':
current_package['essential'] = value == 'yes'
elif key == 'Size':
current_package['size'] = int(value)
else:
current_package[key.lower()] = value
# Add last package
if current_package:
packages.append(current_package)
return packages
except Exception as e:
print(f"Failed to parse packages file: {e}")
return []
def _store_packages_metadata(self, packages: List[Dict[str, Any]],
suite: str, component: str, architecture: str):
"""Store package metadata in database"""
conn = sqlite3.connect(self.metadata_db)
cursor = conn.cursor()
try:
for package in packages:
# Prepare data for insertion
package_data = {
'name': package.get('package', ''),
'version': package.get('version', ''),
'architecture': architecture,
'suite': suite,
'component': component,
'depends': json.dumps(package.get('depends', [])),
'recommends': json.dumps(package.get('recommends', [])),
'suggests': json.dumps(package.get('suggests', [])),
'conflicts': json.dumps(package.get('conflicts', [])),
'breaks': json.dumps(package.get('breaks', [])),
'replaces': json.dumps(package.get('replaces', [])),
'provides': json.dumps(package.get('provides', [])),
'essential': package.get('essential', False),
'priority': package.get('priority', 'optional'),
'size': package.get('size', 0),
'md5sum': package.get('md5sum', ''),
'sha256': package.get('sha256', ''),
'description': package.get('description', ''),
'last_updated': datetime.now().isoformat()
}
# Insert or update
cursor.execute('''
INSERT OR REPLACE INTO packages
(name, version, architecture, suite, component, depends, recommends,
suggests, conflicts, breaks, replaces, provides, essential, priority,
size, md5sum, sha256, description, last_updated)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', tuple(package_data.values()))
conn.commit()
print(f"Stored metadata for {len(packages)} packages")
except Exception as e:
print(f"Failed to store metadata: {e}")
conn.rollback()
finally:
conn.close()
def _update_last_sync(self, repository_url: str, suite: str):
"""Update last sync timestamp"""
sync_info = {
'repository': repository_url,
'suite': suite,
'last_sync': datetime.now().isoformat()
}
with open(self.last_sync_file, 'w') as f:
json.dump(sync_info, f, indent=2)
def get_package_metadata(self, package_name: str, suite: str = None,
architecture: str = None) -> List[PackageMetadata]:
"""Get package metadata from database"""
conn = sqlite3.connect(self.metadata_db)
cursor = conn.cursor()
try:
query = "SELECT * FROM packages WHERE name = ?"
params = [package_name]
if suite:
query += " AND suite = ?"
params.append(suite)
if architecture:
query += " AND architecture = ?"
params.append(architecture)
cursor.execute(query, params)
rows = cursor.fetchall()
packages = []
for row in cursor.fetchall():
package = PackageMetadata(
name=row[0],
version=row[1],
architecture=row[2],
suite=row[3],
component=row[4],
depends=json.loads(row[5]),
recommends=json.loads(row[6]),
suggests=json.loads(row[7]),
conflicts=json.loads(row[8]),
breaks=json.loads(row[9]),
replaces=row[10],
provides=json.loads(row[11]),
essential=row[12],
priority=row[13],
size=row[14],
md5sum=row[15],
sha256=row[16],
description=row[17],
last_updated=datetime.fromisoformat(row[18])
)
packages.append(package)
return packages
except Exception as e:
print(f"Failed to get package metadata: {e}")
return []
finally:
conn.close()
def get_sync_status(self) -> Dict[str, Any]:
"""Get synchronization status"""
if not self.last_sync_file.exists():
return {'status': 'never_synced'}
with open(self.last_sync_file, 'r') as f:
sync_info = json.load(f)
return {
'status': 'synced',
'last_sync': sync_info['last_sync'],
'repository': sync_info['repository'],
'suite': sync_info['suite']
}
def main():
"""Test metadata synchronization"""
sync = DebianPackageMetadataSync()
# Test sync with Debian main repository
repositories = [
{
'url': 'http://deb.debian.org/debian',
'suite': 'bookworm',
'components': ['main'],
'architectures': ['amd64']
}
]
for repo in repositories:
success = sync.sync_repository_metadata(
repo['url'], repo['suite'], repo['components'], repo['architectures']
)
if success:
print(f"Successfully synced {repo['suite']}")
else:
print(f"Failed to sync {repo['suite']}")
# Show sync status
status = sync.get_sync_status()
print(f"Sync status: {status}")
if __name__ == "__main__":
main()