375 lines
13 KiB
Python
375 lines
13 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Debian Package Metadata Synchronization
|
|
|
|
This module handles synchronization of package metadata from Debian repositories,
|
|
including package lists, dependency information, and version tracking.
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import tempfile
|
|
import gzip
|
|
import hashlib
|
|
from typing import Dict, List, Optional, Any, Set
|
|
from dataclasses import dataclass, asdict
|
|
from pathlib import Path
|
|
import urllib.request
|
|
import urllib.parse
|
|
from datetime import datetime, timedelta
|
|
import sqlite3
|
|
|
|
@dataclass
|
|
class PackageMetadata:
|
|
"""Represents package metadata from Debian repositories"""
|
|
name: str
|
|
version: str
|
|
architecture: str
|
|
suite: str
|
|
component: str
|
|
depends: List[str]
|
|
recommends: List[str]
|
|
suggests: List[str]
|
|
conflicts: List[str]
|
|
breaks: List[str]
|
|
replaces: List[str]
|
|
provides: List[str]
|
|
essential: bool
|
|
priority: str
|
|
size: int
|
|
md5sum: str
|
|
sha256: str
|
|
description: str
|
|
last_updated: datetime
|
|
|
|
class DebianPackageMetadataSync:
|
|
"""Synchronizes package metadata from Debian repositories"""
|
|
|
|
def __init__(self, cache_dir: str = "./cache/metadata"):
|
|
self.cache_dir = Path(cache_dir)
|
|
self.cache_dir.mkdir(parents=True, exist_ok=True)
|
|
self.metadata_db = self.cache_dir / "packages.db"
|
|
self.last_sync_file = self.cache_dir / "last_sync.json"
|
|
self._init_database()
|
|
|
|
def _init_database(self):
|
|
"""Initialize SQLite database for package metadata"""
|
|
conn = sqlite3.connect(self.metadata_db)
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS packages (
|
|
name TEXT,
|
|
version TEXT,
|
|
architecture TEXT,
|
|
suite TEXT,
|
|
component TEXT,
|
|
depends TEXT,
|
|
recommends TEXT,
|
|
suggests TEXT,
|
|
conflicts TEXT,
|
|
breaks TEXT,
|
|
replaces TEXT,
|
|
provides TEXT,
|
|
essential BOOLEAN,
|
|
priority TEXT,
|
|
size INTEGER,
|
|
md5sum TEXT,
|
|
sha256 TEXT,
|
|
description TEXT,
|
|
last_updated TIMESTAMP,
|
|
PRIMARY KEY (name, version, architecture, suite)
|
|
)
|
|
''')
|
|
|
|
cursor.execute('''
|
|
CREATE INDEX IF NOT EXISTS idx_package_name
|
|
ON packages(name)
|
|
''')
|
|
|
|
cursor.execute('''
|
|
CREATE INDEX IF NOT EXISTS idx_suite_arch
|
|
ON packages(suite, architecture)
|
|
''')
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
def sync_repository_metadata(self, repository_url: str, suite: str,
|
|
components: List[str], architectures: List[str]) -> bool:
|
|
"""Sync package metadata from a Debian repository"""
|
|
try:
|
|
print(f"Syncing metadata from {repository_url} for suite {suite}")
|
|
|
|
for component in components:
|
|
for arch in architectures:
|
|
success = self._sync_component_metadata(
|
|
repository_url, suite, component, arch
|
|
)
|
|
if not success:
|
|
print(f"Failed to sync {component}/{arch}")
|
|
return False
|
|
|
|
self._update_last_sync(repository_url, suite)
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"Metadata sync failed: {e}")
|
|
return False
|
|
|
|
def _sync_component_metadata(self, repository_url: str, suite: str,
|
|
component: str, architecture: str) -> bool:
|
|
"""Sync metadata for a specific component and architecture"""
|
|
try:
|
|
# Download Packages.gz file
|
|
packages_url = f"{repository_url}/dists/{suite}/{component}/binary-{architecture}/Packages.gz"
|
|
packages_file = self.cache_dir / f"Packages_{suite}_{component}_{architecture}.gz"
|
|
|
|
# Download if newer than local copy
|
|
if not self._download_if_newer(packages_url, packages_file):
|
|
return False
|
|
|
|
# Parse and store metadata
|
|
packages_data = self._parse_packages_file(packages_file)
|
|
self._store_packages_metadata(packages_data, suite, component, architecture)
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"Component sync failed for {component}/{architecture}: {e}")
|
|
return False
|
|
|
|
def _download_if_newer(self, url: str, local_file: Path) -> bool:
|
|
"""Download file if it's newer than local copy"""
|
|
try:
|
|
# Check if we need to download
|
|
if local_file.exists():
|
|
local_time = local_file.stat().st_mtime
|
|
remote_time = self._get_remote_file_time(url)
|
|
|
|
if remote_time <= local_time:
|
|
print(f"Local file is up to date: {local_file.name}")
|
|
return True
|
|
|
|
# Download the file
|
|
print(f"Downloading {url}")
|
|
urllib.request.urlretrieve(url, local_file)
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"Download failed: {e}")
|
|
return False
|
|
|
|
def _get_remote_file_time(self, url: str) -> float:
|
|
"""Get last modified time of remote file"""
|
|
try:
|
|
req = urllib.request.Request(url, method='HEAD')
|
|
with urllib.request.urlopen(req) as response:
|
|
last_modified = response.headers.get('Last-Modified')
|
|
if last_modified:
|
|
dt = datetime.strptime(last_modified, '%a, %d %b %Y %H:%M:%S %Z')
|
|
return dt.timestamp()
|
|
return 0
|
|
except Exception:
|
|
return 0
|
|
|
|
def _parse_packages_file(self, packages_file: Path) -> List[Dict[str, Any]]:
|
|
"""Parse Debian Packages.gz file"""
|
|
packages = []
|
|
current_package = {}
|
|
|
|
try:
|
|
with gzip.open(packages_file, 'rt', encoding='utf-8') as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
|
|
if not line:
|
|
if current_package:
|
|
packages.append(current_package.copy())
|
|
current_package = {}
|
|
continue
|
|
|
|
if ':' in line:
|
|
key, value = line.split(':', 1)
|
|
key = key.strip()
|
|
value = value.strip()
|
|
|
|
if key in ['Depends', 'Recommends', 'Suggests', 'Conflicts', 'Breaks', 'Replaces', 'Provides']:
|
|
current_package[key.lower()] = [dep.strip() for dep in value.split(',') if dep.strip()]
|
|
elif key == 'Essential':
|
|
current_package['essential'] = value == 'yes'
|
|
elif key == 'Size':
|
|
current_package['size'] = int(value)
|
|
else:
|
|
current_package[key.lower()] = value
|
|
|
|
# Add last package
|
|
if current_package:
|
|
packages.append(current_package)
|
|
|
|
return packages
|
|
|
|
except Exception as e:
|
|
print(f"Failed to parse packages file: {e}")
|
|
return []
|
|
|
|
def _store_packages_metadata(self, packages: List[Dict[str, Any]],
|
|
suite: str, component: str, architecture: str):
|
|
"""Store package metadata in database"""
|
|
conn = sqlite3.connect(self.metadata_db)
|
|
cursor = conn.cursor()
|
|
|
|
try:
|
|
for package in packages:
|
|
# Prepare data for insertion
|
|
package_data = {
|
|
'name': package.get('package', ''),
|
|
'version': package.get('version', ''),
|
|
'architecture': architecture,
|
|
'suite': suite,
|
|
'component': component,
|
|
'depends': json.dumps(package.get('depends', [])),
|
|
'recommends': json.dumps(package.get('recommends', [])),
|
|
'suggests': json.dumps(package.get('suggests', [])),
|
|
'conflicts': json.dumps(package.get('conflicts', [])),
|
|
'breaks': json.dumps(package.get('breaks', [])),
|
|
'replaces': json.dumps(package.get('replaces', [])),
|
|
'provides': json.dumps(package.get('provides', [])),
|
|
'essential': package.get('essential', False),
|
|
'priority': package.get('priority', 'optional'),
|
|
'size': package.get('size', 0),
|
|
'md5sum': package.get('md5sum', ''),
|
|
'sha256': package.get('sha256', ''),
|
|
'description': package.get('description', ''),
|
|
'last_updated': datetime.now().isoformat()
|
|
}
|
|
|
|
# Insert or update
|
|
cursor.execute('''
|
|
INSERT OR REPLACE INTO packages
|
|
(name, version, architecture, suite, component, depends, recommends,
|
|
suggests, conflicts, breaks, replaces, provides, essential, priority,
|
|
size, md5sum, sha256, description, last_updated)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
''', tuple(package_data.values()))
|
|
|
|
conn.commit()
|
|
print(f"Stored metadata for {len(packages)} packages")
|
|
|
|
except Exception as e:
|
|
print(f"Failed to store metadata: {e}")
|
|
conn.rollback()
|
|
finally:
|
|
conn.close()
|
|
|
|
def _update_last_sync(self, repository_url: str, suite: str):
|
|
"""Update last sync timestamp"""
|
|
sync_info = {
|
|
'repository': repository_url,
|
|
'suite': suite,
|
|
'last_sync': datetime.now().isoformat()
|
|
}
|
|
|
|
with open(self.last_sync_file, 'w') as f:
|
|
json.dump(sync_info, f, indent=2)
|
|
|
|
def get_package_metadata(self, package_name: str, suite: str = None,
|
|
architecture: str = None) -> List[PackageMetadata]:
|
|
"""Get package metadata from database"""
|
|
conn = sqlite3.connect(self.metadata_db)
|
|
cursor = conn.cursor()
|
|
|
|
try:
|
|
query = "SELECT * FROM packages WHERE name = ?"
|
|
params = [package_name]
|
|
|
|
if suite:
|
|
query += " AND suite = ?"
|
|
params.append(suite)
|
|
|
|
if architecture:
|
|
query += " AND architecture = ?"
|
|
params.append(architecture)
|
|
|
|
cursor.execute(query, params)
|
|
rows = cursor.fetchall()
|
|
|
|
packages = []
|
|
for row in cursor.fetchall():
|
|
package = PackageMetadata(
|
|
name=row[0],
|
|
version=row[1],
|
|
architecture=row[2],
|
|
suite=row[3],
|
|
component=row[4],
|
|
depends=json.loads(row[5]),
|
|
recommends=json.loads(row[6]),
|
|
suggests=json.loads(row[7]),
|
|
conflicts=json.loads(row[8]),
|
|
breaks=json.loads(row[9]),
|
|
replaces=row[10],
|
|
provides=json.loads(row[11]),
|
|
essential=row[12],
|
|
priority=row[13],
|
|
size=row[14],
|
|
md5sum=row[15],
|
|
sha256=row[16],
|
|
description=row[17],
|
|
last_updated=datetime.fromisoformat(row[18])
|
|
)
|
|
packages.append(package)
|
|
|
|
return packages
|
|
|
|
except Exception as e:
|
|
print(f"Failed to get package metadata: {e}")
|
|
return []
|
|
finally:
|
|
conn.close()
|
|
|
|
def get_sync_status(self) -> Dict[str, Any]:
|
|
"""Get synchronization status"""
|
|
if not self.last_sync_file.exists():
|
|
return {'status': 'never_synced'}
|
|
|
|
with open(self.last_sync_file, 'r') as f:
|
|
sync_info = json.load(f)
|
|
|
|
return {
|
|
'status': 'synced',
|
|
'last_sync': sync_info['last_sync'],
|
|
'repository': sync_info['repository'],
|
|
'suite': sync_info['suite']
|
|
}
|
|
|
|
def main():
|
|
"""Test metadata synchronization"""
|
|
sync = DebianPackageMetadataSync()
|
|
|
|
# Test sync with Debian main repository
|
|
repositories = [
|
|
{
|
|
'url': 'http://deb.debian.org/debian',
|
|
'suite': 'bookworm',
|
|
'components': ['main'],
|
|
'architectures': ['amd64']
|
|
}
|
|
]
|
|
|
|
for repo in repositories:
|
|
success = sync.sync_repository_metadata(
|
|
repo['url'], repo['suite'], repo['components'], repo['architectures']
|
|
)
|
|
|
|
if success:
|
|
print(f"Successfully synced {repo['suite']}")
|
|
else:
|
|
print(f"Failed to sync {repo['suite']}")
|
|
|
|
# Show sync status
|
|
status = sync.get_sync_status()
|
|
print(f"Sync status: {status}")
|
|
|
|
if __name__ == "__main__":
|
|
main()
|