diff --git a/.forgejo/workflows/ci.yml b/.forgejo/workflows/ci.yml new file mode 100644 index 0000000..a67103f --- /dev/null +++ b/.forgejo/workflows/ci.yml @@ -0,0 +1,142 @@ +--- +name: Blue Build Schema CI/CD + +on: + push: + branches: [main, develop] + pull_request: + branches: [main] + workflow_dispatch: + +env: + NODE_VERSION: "20" + DEBIAN_FRONTEND: noninteractive + +jobs: + build-and-package: + name: Build and Package Node.js Schemas + runs-on: ubuntu-latest + container: + image: node:20-bullseye + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Setup Node.js environment + run: | + node --version + npm --version + + - name: Install build dependencies + run: | + apt-get update + apt-get install -y \ + build-essential \ + devscripts \ + debhelper \ + git \ + ca-certificates + + - name: Install Node.js dependencies + run: | + npm ci + if [ -f package.json ]; then + npm run build || echo "No build script found" + fi + + - name: Create debian directory + run: | + mkdir -p debian + cat > debian/control << EOF +Source: blue-build-schema +Section: javascript +Priority: optional +Maintainer: Blue Build Team +Build-Depends: debhelper (>= 13), nodejs, npm, git, ca-certificates +Standards-Version: 4.6.2 + +Package: blue-build-schema +Architecture: all +Depends: \${misc:Depends}, nodejs +Description: Blue Build Schema Definitions + Node.js schemas for the blue-build ecosystem including + package schemas, metadata schemas, and validation schemas. +EOF + + cat > debian/rules << EOF +#!/usr/bin/make -f +%: + dh \$@ + +override_dh_auto_install: + dh_auto_install + mkdir -p debian/blue-build-schema/usr/lib/node_modules/blue-build-schema + cp -r *.py debian/blue-build-schema/usr/lib/node_modules/blue-build-schema/ + if [ -d node_modules ]; then + cp -r node_modules debian/blue-build-schema/usr/lib/node_modules/blue-build-schema/ + fi +EOF + + cat > debian/changelog << EOF +blue-build-schema (1.0.0-1) unstable; urgency=medium + + * Initial release + * Blue Build schema definitions implementation + + -- Blue Build Team $(date -R) +EOF + + cat > debian/compat << EOF +13 +EOF + + chmod +x debian/rules + + - name: Build Debian package + run: | + dpkg-buildpackage -us -uc -b + ls -la ../*.deb + + - name: Upload artifacts + uses: actions/upload-artifact@v4 + with: + name: blue-build-schema-deb + path: ../*.deb + retention-days: 30 + + test: + name: Test Node.js Schemas + runs-on: ubuntu-latest + container: + image: node:20-bullseye + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Setup Node.js environment + run: | + node --version + npm --version + + - name: Install Node.js dependencies + run: npm ci + + - name: Run Node.js tests + run: | + if [ -f package.json ] && npm run test; then + npm test + else + echo "No test script found, skipping tests" + fi + + - name: Test schema files + run: | + ls -la *.py + echo "Schema files present:" + for file in *.py; do + if [ -f "$file" ]; then + echo " - $file" + fi + done diff --git a/debian_package_build_system.py b/debian_package_build_system.py new file mode 100644 index 0000000..9026989 --- /dev/null +++ b/debian_package_build_system.py @@ -0,0 +1,537 @@ +#!/usr/bin/env python3 +""" +Debian Package Build System Integration + +This module integrates with Debian package building tools like sbuild and pbuilder, +providing package building, validation, and testing capabilities. +""" + +import json +import os +import subprocess +import tempfile +import shutil +from typing import Dict, List, Optional, Any, Tuple +from dataclasses import dataclass, asdict +from pathlib import Path +import logging +from datetime import datetime + +@dataclass +class BuildEnvironment: + """Represents a Debian build environment""" + name: str + suite: str + architecture: str + mirror: str + components: List[str] + extra_repositories: List[str] + build_dependencies: List[str] + enabled: bool = True + +@dataclass +class BuildResult: + """Represents the result of a package build""" + package_name: str + version: str + architecture: str + suite: str + build_status: str + build_log: str + artifacts: List[str] + build_time: float + dependencies_resolved: bool + tests_passed: bool + timestamp: datetime + +class DebianPackageBuildSystem: + """Integrates with Debian package building tools""" + + def __init__(self, config_dir: str = "./config/build-system"): + self.config_dir = Path(config_dir) + self.config_dir.mkdir(parents=True, exist_ok=True) + self.build_logs_dir = self.config_dir / "build-logs" + self.build_logs_dir.mkdir(exist_ok=True) + self.artifacts_dir = self.config_dir / "artifacts" + self.artifacts_dir.mkdir(exist_ok=True) + + self.logger = self._setup_logging() + self._load_configuration() + + def _setup_logging(self) -> logging.Logger: + """Setup logging for build system""" + logger = logging.getLogger('debian-build-system') + logger.setLevel(logging.INFO) + + if not logger.handlers: + handler = logging.FileHandler(self.config_dir / "build-system.log") + formatter = logging.Formatter( + '%(asctime)s - %(name)s - %(levelname)s - %(message)s' + ) + handler.setFormatter(formatter) + logger.addHandler(handler) + + return logger + + def _load_configuration(self): + """Load build system configuration""" + config_file = self.config_dir / "build-environments.json" + + if config_file.exists(): + with open(config_file, 'r') as f: + self.build_environments = json.load(f) + else: + self.build_environments = self._get_default_environments() + self._save_configuration() + + def _get_default_environments(self) -> Dict[str, Any]: + """Get default build environment configurations""" + return { + "environments": [ + { + "name": "bookworm-amd64", + "suite": "bookworm", + "architecture": "amd64", + "mirror": "http://deb.debian.org/debian", + "components": ["main", "contrib", "non-free-firmware"], + "extra_repositories": [], + "build_dependencies": ["build-essential", "devscripts", "debhelper"], + "enabled": True + }, + { + "name": "sid-amd64", + "suite": "sid", + "architecture": "amd64", + "mirror": "http://deb.debian.org/debian", + "components": ["main", "contrib", "non-free-firmware"], + "extra_repositories": [], + "build_dependencies": ["build-essential", "devscripts", "debhelper"], + "enabled": True + } + ] + } + + def _save_configuration(self): + """Save build system configuration""" + config_file = self.config_dir / "build-environments.json" + with open(config_file, 'w') as f: + json.dump(self.build_environments, f, indent=2) + + def check_build_tools(self) -> Dict[str, bool]: + """Check availability of Debian build tools""" + tools = { + 'sbuild': self._check_command('sbuild'), + 'pbuilder': self._check_command('pbuilder'), + 'dpkg-buildpackage': self._check_command('dpkg-buildpackage'), + 'debuild': self._check_command('debuild'), + 'apt-get': self._check_command('apt-get'), + 'schroot': self._check_command('schroot') + } + + return tools + + def _check_command(self, command: str) -> bool: + """Check if a command is available""" + try: + result = subprocess.run( + ['which', command], + capture_output=True, + text=True + ) + return result.returncode == 0 + except Exception: + return False + + def setup_build_environment(self, environment_name: str) -> bool: + """Setup a build environment using sbuild or pbuilder""" + try: + env_config = self._get_environment_config(environment_name) + if not env_config: + self.logger.error(f"Environment {environment_name} not found") + return False + + # Check if environment already exists + if self._environment_exists(environment_name): + self.logger.info(f"Environment {environment_name} already exists") + return True + + # Create environment + if self._create_sbuild_environment(env_config): + self.logger.info(f"Successfully created sbuild environment {environment_name}") + return True + elif self._create_pbuilder_environment(env_config): + self.logger.info(f"Successfully created pbuilder environment {environment_name}") + return True + else: + self.logger.error(f"Failed to create build environment {environment_name}") + return False + + except Exception as e: + self.logger.error(f"Environment setup failed: {e}") + return False + + def _get_environment_config(self, name: str) -> Optional[Dict[str, Any]]: + """Get environment configuration by name""" + for env in self.build_environments["environments"]: + if env["name"] == name: + return env + return None + + def _environment_exists(self, name: str) -> bool: + """Check if build environment exists""" + # Check sbuild + try: + result = subprocess.run( + ['schroot', '-l'], + capture_output=True, + text=True + ) + return name in result.stdout + except Exception: + pass + + # Check pbuilder + pbuilder_dir = Path(f"/var/cache/pbuilder/{name}") + return pbuilder_dir.exists() + + def _create_sbuild_environment(self, config: Dict[str, Any]) -> bool: + """Create sbuild environment""" + try: + # Create schroot configuration + schroot_conf = f""" +[{config['name']}] +description=Debian {config['suite']} {config['architecture']} build environment +directory=/var/chroot/{config['name']} +root-users=root +users=buildd +type=directory +profile=sbuild +""" + + schroot_conf_file = Path(f"/etc/schroot/chroot.d/{config['name']}") + schroot_conf_file.write_text(schroot_conf) + + # Create chroot directory + chroot_dir = Path(f"/var/chroot/{config['name']}") + chroot_dir.mkdir(parents=True, exist_ok=True) + + # Bootstrap the chroot + cmd = [ + 'debootstrap', + '--arch', config['architecture'], + '--variant=buildd', + config['suite'], + str(chroot_dir), + config['mirror'] + ] + + result = subprocess.run(cmd, capture_output=True, text=True) + if result.returncode != 0: + self.logger.error(f"Debootstrap failed: {result.stderr}") + return False + + # Install build dependencies + self._install_build_dependencies(config, chroot_dir) + + return True + + except Exception as e: + self.logger.error(f"Sbuild environment creation failed: {e}") + return False + + def _create_pbuilder_environment(self, config: Dict[str, Any]) -> bool: + """Create pbuilder environment""" + try: + # Create pbuilder configuration + pbuilder_conf = f""" +DISTRIBUTION={config['suite']} +ARCHITECTURE={config['architecture']} +MIRRORSITE={config['mirror']} +COMPONENTS="{' '.join(config['components'])}" +OTHERMIRROR="{' '.join(config['extra_repositories'])}" +BUILDRESULT=/var/cache/pbuilder/result +APTCACHEHARDLINK=yes +USEPROC=yes +USEDEVPTS=yes +USEDEVFS=no +BUILDPLACE=/var/cache/pbuilder/build +""" + + pbuilder_conf_file = Path(f"/etc/pbuilderrc-{config['name']}") + pbuilder_conf_file.write_text(pbuilder_conf) + + # Create pbuilder environment + cmd = [ + 'pbuilder', '--create', + '--configfile', str(pbuilder_conf_file), + '--basetgz', f"/var/cache/pbuilder/{config['name']}-base.tgz" + ] + + result = subprocess.run(cmd, capture_output=True, text=True) + if result.returncode != 0: + self.logger.error(f"Pbuilder creation failed: {result.stderr}") + return False + + return True + + except Exception as e: + self.logger.error(f"Pbuilder environment creation failed: {e}") + return False + + def _install_build_dependencies(self, config: Dict[str, Any], chroot_dir: Path): + """Install build dependencies in chroot""" + try: + # Mount proc and dev + subprocess.run(['mount', '--bind', '/proc', f"{chroot_dir}/proc"]) + subprocess.run(['mount', '--bind', '/dev', f"{chroot_dir}/dev"]) + + # Update package lists + cmd = ['chroot', str(chroot_dir), 'apt-get', 'update'] + subprocess.run(cmd, capture_output=True, text=True) + + # Install build dependencies + if config['build_dependencies']: + cmd = ['chroot', str(chroot_dir), 'apt-get', 'install', '-y'] + config['build_dependencies'] + subprocess.run(cmd, capture_output=True, text=True) + + # Unmount + subprocess.run(['umount', f"{chroot_dir}/proc"]) + subprocess.run(['umount', f"{chroot_dir}/dev"]) + + except Exception as e: + self.logger.error(f"Failed to install build dependencies: {e}") + + def build_package(self, package_source: str, environment_name: str, + build_options: Dict[str, Any] = None) -> BuildResult: + """Build a Debian package""" + start_time = datetime.now() + + try: + env_config = self._get_environment_config(environment_name) + if not env_config: + raise ValueError(f"Environment {environment_name} not found") + + # Setup build environment if needed + if not self._environment_exists(environment_name): + if not self.setup_build_environment(environment_name): + raise RuntimeError(f"Failed to setup build environment {environment_name}") + + # Build package + build_log = self._build_package_internal(package_source, env_config, build_options) + + # Collect artifacts + artifacts = self._collect_build_artifacts(package_source, environment_name) + + # Run tests if available + tests_passed = self._run_package_tests(package_source, environment_name) + + build_time = (datetime.now() - start_time).total_seconds() + + return BuildResult( + package_name=self._extract_package_name(package_source), + version=self._extract_package_version(package_source), + architecture=env_config['architecture'], + suite=env_config['suite'], + build_status='success', + build_log=build_log, + artifacts=artifacts, + build_time=build_time, + dependencies_resolved=True, + tests_passed=tests_passed, + timestamp=datetime.now() + ) + + except Exception as e: + self.logger.error(f"Package build failed: {e}") + build_time = (datetime.now() - start_time).total_seconds() + + return BuildResult( + package_name=self._extract_package_name(package_source), + version='unknown', + architecture='unknown', + suite='unknown', + build_status='failed', + build_log=str(e), + artifacts=[], + build_time=build_time, + dependencies_resolved=False, + tests_passed=False, + timestamp=datetime.now() + ) + + def _build_package_internal(self, package_source: str, env_config: Dict[str, Any], + build_options: Dict[str, Any]) -> str: + """Internal package building logic""" + try: + # Try sbuild first + if self._check_command('sbuild'): + return self._build_with_sbuild(package_source, env_config, build_options) + # Fall back to pbuilder + elif self._check_command('pbuilder'): + return self._build_with_pbuilder(package_source, env_config, build_options) + else: + raise RuntimeError("No build tools available") + + except Exception as e: + self.logger.error(f"Build failed: {e}") + raise + + def _build_with_sbuild(self, package_source: str, env_config: Dict[str, Any], + build_options: Dict[str, Any]) -> str: + """Build package using sbuild""" + try: + cmd = [ + 'sbuild', + '--dist', env_config['suite'], + '--arch', env_config['architecture'], + '--chroot', env_config['name'] + ] + + if build_options and build_options.get('verbose'): + cmd.append('--verbose') + + cmd.append(package_source) + + result = subprocess.run(cmd, capture_output=True, text=True) + + if result.returncode != 0: + raise RuntimeError(f"Sbuild failed: {result.stderr}") + + return result.stdout + + except Exception as e: + self.logger.error(f"Sbuild build failed: {e}") + raise + + def _build_with_pbuilder(self, package_source: str, env_config: Dict[str, Any], + build_options: Dict[str, Any]) -> str: + """Build package using pbuilder""" + try: + cmd = [ + 'pbuilder', + '--build', + '--configfile', f"/etc/pbuilderrc-{env_config['name']}", + '--basetgz', f"/var/cache/pbuilder/{env_config['name']}-base.tgz", + '--buildresult', f"/var/cache/pbuilder/result" + ] + + if build_options and build_options.get('verbose'): + cmd.append('--verbose') + + cmd.append(package_source) + + result = subprocess.run(cmd, capture_output=True, text=True) + + if result.returncode != 0: + raise RuntimeError(f"Pbuilder failed: {result.stderr}") + + return result.stdout + + except Exception as e: + self.logger.error(f"Pbuilder build failed: {e}") + raise + + def _collect_build_artifacts(self, package_source: str, environment_name: str) -> List[str]: + """Collect build artifacts""" + artifacts = [] + + try: + # Look for .deb files + package_name = self._extract_package_name(package_source) + deb_files = list(Path('.').glob(f"{package_name}*.deb")) + artifacts.extend([str(f) for f in deb_files]) + + # Look for source packages + dsc_files = list(Path('.').glob(f"{package_name}*.dsc")) + artifacts.extend([str(f) for f in dsc_files]) + + # Look for build logs + log_files = list(Path('.').glob(f"{package_name}*.build")) + artifacts.extend([str(f) for f in log_files]) + + except Exception as e: + self.logger.error(f"Failed to collect artifacts: {e}") + + return artifacts + + def _run_package_tests(self, package_source: str, environment_name: str) -> bool: + """Run package tests if available""" + try: + # Look for test suite + test_dir = Path(package_source) / "debian" / "tests" + if not test_dir.exists(): + return True # No tests to run + + # Run tests using autopkgtest if available + if self._check_command('autopkgtest'): + cmd = ['autopkgtest', package_source, '--', 'schroot', environment_name] + result = subprocess.run(cmd, capture_output=True, text=True) + return result.returncode == 0 + + return True + + except Exception as e: + self.logger.error(f"Test execution failed: {e}") + return False + + def _extract_package_name(self, package_source: str) -> str: + """Extract package name from source""" + try: + # Try to read debian/control + control_file = Path(package_source) / "debian" / "control" + if control_file.exists(): + with open(control_file, 'r') as f: + for line in f: + if line.startswith('Package:'): + return line.split(':', 1)[1].strip() + + # Fallback to directory name + return Path(package_source).name + + except Exception: + return Path(package_source).name + + def _extract_package_version(self, package_source: str) -> str: + """Extract package version from source""" + try: + # Try to read debian/changelog + changelog_file = Path(package_source) / "debian" / "changelog" + if changelog_file.exists(): + with open(changelog_file, 'r') as f: + first_line = f.readline().strip() + if '(' in first_line and ')' in first_line: + version_part = first_line.split('(')[1].split(')')[0] + return version_part + + return 'unknown' + + except Exception: + return 'unknown' + + def list_build_environments(self) -> List[Dict[str, Any]]: + """List available build environments""" + return self.build_environments["environments"] + + def get_build_environment(self, name: str) -> Optional[Dict[str, Any]]: + """Get build environment configuration""" + return self._get_environment_config(name) + +def main(): + """Test build system integration""" + build_system = DebianPackageBuildSystem() + + # Check available tools + tools = build_system.check_build_tools() + print("Available build tools:") + for tool, available in tools.items(): + status = "✓" if available else "✗" + print(f" {status} {tool}") + + # List build environments + environments = build_system.list_build_environments() + print(f"\nBuild environments: {len(environments)}") + for env in environments: + print(f" - {env['name']}: {env['suite']}/{env['architecture']}") + +if __name__ == "__main__": + main() diff --git a/debian_package_metadata_sync.py b/debian_package_metadata_sync.py new file mode 100644 index 0000000..dd62cab --- /dev/null +++ b/debian_package_metadata_sync.py @@ -0,0 +1,375 @@ +#!/usr/bin/env python3 +""" +Debian Package Metadata Synchronization + +This module handles synchronization of package metadata from Debian repositories, +including package lists, dependency information, and version tracking. +""" + +import json +import os +import subprocess +import tempfile +import gzip +import hashlib +from typing import Dict, List, Optional, Any, Set +from dataclasses import dataclass, asdict +from pathlib import Path +import urllib.request +import urllib.parse +from datetime import datetime, timedelta +import sqlite3 + +@dataclass +class PackageMetadata: + """Represents package metadata from Debian repositories""" + name: str + version: str + architecture: str + suite: str + component: str + depends: List[str] + recommends: List[str] + suggests: List[str] + conflicts: List[str] + breaks: List[str] + replaces: List[str] + provides: List[str] + essential: bool + priority: str + size: int + md5sum: str + sha256: str + description: str + last_updated: datetime + +class DebianPackageMetadataSync: + """Synchronizes package metadata from Debian repositories""" + + def __init__(self, cache_dir: str = "./cache/metadata"): + self.cache_dir = Path(cache_dir) + self.cache_dir.mkdir(parents=True, exist_ok=True) + self.metadata_db = self.cache_dir / "packages.db" + self.last_sync_file = self.cache_dir / "last_sync.json" + self._init_database() + + def _init_database(self): + """Initialize SQLite database for package metadata""" + conn = sqlite3.connect(self.metadata_db) + cursor = conn.cursor() + + cursor.execute(''' + CREATE TABLE IF NOT EXISTS packages ( + name TEXT, + version TEXT, + architecture TEXT, + suite TEXT, + component TEXT, + depends TEXT, + recommends TEXT, + suggests TEXT, + conflicts TEXT, + breaks TEXT, + replaces TEXT, + provides TEXT, + essential BOOLEAN, + priority TEXT, + size INTEGER, + md5sum TEXT, + sha256 TEXT, + description TEXT, + last_updated TIMESTAMP, + PRIMARY KEY (name, version, architecture, suite) + ) + ''') + + cursor.execute(''' + CREATE INDEX IF NOT EXISTS idx_package_name + ON packages(name) + ''') + + cursor.execute(''' + CREATE INDEX IF NOT EXISTS idx_suite_arch + ON packages(suite, architecture) + ''') + + conn.commit() + conn.close() + + def sync_repository_metadata(self, repository_url: str, suite: str, + components: List[str], architectures: List[str]) -> bool: + """Sync package metadata from a Debian repository""" + try: + print(f"Syncing metadata from {repository_url} for suite {suite}") + + for component in components: + for arch in architectures: + success = self._sync_component_metadata( + repository_url, suite, component, arch + ) + if not success: + print(f"Failed to sync {component}/{arch}") + return False + + self._update_last_sync(repository_url, suite) + return True + + except Exception as e: + print(f"Metadata sync failed: {e}") + return False + + def _sync_component_metadata(self, repository_url: str, suite: str, + component: str, architecture: str) -> bool: + """Sync metadata for a specific component and architecture""" + try: + # Download Packages.gz file + packages_url = f"{repository_url}/dists/{suite}/{component}/binary-{architecture}/Packages.gz" + packages_file = self.cache_dir / f"Packages_{suite}_{component}_{architecture}.gz" + + # Download if newer than local copy + if not self._download_if_newer(packages_url, packages_file): + return False + + # Parse and store metadata + packages_data = self._parse_packages_file(packages_file) + self._store_packages_metadata(packages_data, suite, component, architecture) + + return True + + except Exception as e: + print(f"Component sync failed for {component}/{architecture}: {e}") + return False + + def _download_if_newer(self, url: str, local_file: Path) -> bool: + """Download file if it's newer than local copy""" + try: + # Check if we need to download + if local_file.exists(): + local_time = local_file.stat().st_mtime + remote_time = self._get_remote_file_time(url) + + if remote_time <= local_time: + print(f"Local file is up to date: {local_file.name}") + return True + + # Download the file + print(f"Downloading {url}") + urllib.request.urlretrieve(url, local_file) + return True + + except Exception as e: + print(f"Download failed: {e}") + return False + + def _get_remote_file_time(self, url: str) -> float: + """Get last modified time of remote file""" + try: + req = urllib.request.Request(url, method='HEAD') + with urllib.request.urlopen(req) as response: + last_modified = response.headers.get('Last-Modified') + if last_modified: + dt = datetime.strptime(last_modified, '%a, %d %b %Y %H:%M:%S %Z') + return dt.timestamp() + return 0 + except Exception: + return 0 + + def _parse_packages_file(self, packages_file: Path) -> List[Dict[str, Any]]: + """Parse Debian Packages.gz file""" + packages = [] + current_package = {} + + try: + with gzip.open(packages_file, 'rt', encoding='utf-8') as f: + for line in f: + line = line.strip() + + if not line: + if current_package: + packages.append(current_package.copy()) + current_package = {} + continue + + if ':' in line: + key, value = line.split(':', 1) + key = key.strip() + value = value.strip() + + if key in ['Depends', 'Recommends', 'Suggests', 'Conflicts', 'Breaks', 'Replaces', 'Provides']: + current_package[key.lower()] = [dep.strip() for dep in value.split(',') if dep.strip()] + elif key == 'Essential': + current_package['essential'] = value == 'yes' + elif key == 'Size': + current_package['size'] = int(value) + else: + current_package[key.lower()] = value + + # Add last package + if current_package: + packages.append(current_package) + + return packages + + except Exception as e: + print(f"Failed to parse packages file: {e}") + return [] + + def _store_packages_metadata(self, packages: List[Dict[str, Any]], + suite: str, component: str, architecture: str): + """Store package metadata in database""" + conn = sqlite3.connect(self.metadata_db) + cursor = conn.cursor() + + try: + for package in packages: + # Prepare data for insertion + package_data = { + 'name': package.get('package', ''), + 'version': package.get('version', ''), + 'architecture': architecture, + 'suite': suite, + 'component': component, + 'depends': json.dumps(package.get('depends', [])), + 'recommends': json.dumps(package.get('recommends', [])), + 'suggests': json.dumps(package.get('suggests', [])), + 'conflicts': json.dumps(package.get('conflicts', [])), + 'breaks': json.dumps(package.get('breaks', [])), + 'replaces': json.dumps(package.get('replaces', [])), + 'provides': json.dumps(package.get('provides', [])), + 'essential': package.get('essential', False), + 'priority': package.get('priority', 'optional'), + 'size': package.get('size', 0), + 'md5sum': package.get('md5sum', ''), + 'sha256': package.get('sha256', ''), + 'description': package.get('description', ''), + 'last_updated': datetime.now().isoformat() + } + + # Insert or update + cursor.execute(''' + INSERT OR REPLACE INTO packages + (name, version, architecture, suite, component, depends, recommends, + suggests, conflicts, breaks, replaces, provides, essential, priority, + size, md5sum, sha256, description, last_updated) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ''', tuple(package_data.values())) + + conn.commit() + print(f"Stored metadata for {len(packages)} packages") + + except Exception as e: + print(f"Failed to store metadata: {e}") + conn.rollback() + finally: + conn.close() + + def _update_last_sync(self, repository_url: str, suite: str): + """Update last sync timestamp""" + sync_info = { + 'repository': repository_url, + 'suite': suite, + 'last_sync': datetime.now().isoformat() + } + + with open(self.last_sync_file, 'w') as f: + json.dump(sync_info, f, indent=2) + + def get_package_metadata(self, package_name: str, suite: str = None, + architecture: str = None) -> List[PackageMetadata]: + """Get package metadata from database""" + conn = sqlite3.connect(self.metadata_db) + cursor = conn.cursor() + + try: + query = "SELECT * FROM packages WHERE name = ?" + params = [package_name] + + if suite: + query += " AND suite = ?" + params.append(suite) + + if architecture: + query += " AND architecture = ?" + params.append(architecture) + + cursor.execute(query, params) + rows = cursor.fetchall() + + packages = [] + for row in cursor.fetchall(): + package = PackageMetadata( + name=row[0], + version=row[1], + architecture=row[2], + suite=row[3], + component=row[4], + depends=json.loads(row[5]), + recommends=json.loads(row[6]), + suggests=json.loads(row[7]), + conflicts=json.loads(row[8]), + breaks=json.loads(row[9]), + replaces=row[10], + provides=json.loads(row[11]), + essential=row[12], + priority=row[13], + size=row[14], + md5sum=row[15], + sha256=row[16], + description=row[17], + last_updated=datetime.fromisoformat(row[18]) + ) + packages.append(package) + + return packages + + except Exception as e: + print(f"Failed to get package metadata: {e}") + return [] + finally: + conn.close() + + def get_sync_status(self) -> Dict[str, Any]: + """Get synchronization status""" + if not self.last_sync_file.exists(): + return {'status': 'never_synced'} + + with open(self.last_sync_file, 'r') as f: + sync_info = json.load(f) + + return { + 'status': 'synced', + 'last_sync': sync_info['last_sync'], + 'repository': sync_info['repository'], + 'suite': sync_info['suite'] + } + +def main(): + """Test metadata synchronization""" + sync = DebianPackageMetadataSync() + + # Test sync with Debian main repository + repositories = [ + { + 'url': 'http://deb.debian.org/debian', + 'suite': 'bookworm', + 'components': ['main'], + 'architectures': ['amd64'] + } + ] + + for repo in repositories: + success = sync.sync_repository_metadata( + repo['url'], repo['suite'], repo['components'], repo['architectures'] + ) + + if success: + print(f"Successfully synced {repo['suite']}") + else: + print(f"Failed to sync {repo['suite']}") + + # Show sync status + status = sync.get_sync_status() + print(f"Sync status: {status}") + +if __name__ == "__main__": + main() diff --git a/debian_package_resolver.py b/debian_package_resolver.py new file mode 100644 index 0000000..1383b40 --- /dev/null +++ b/debian_package_resolver.py @@ -0,0 +1,365 @@ +#!/usr/bin/env python3 +""" +Debian Package Dependency Resolver for Debian Forge + +This module provides Debian package dependency resolution for OSBuild Composer, +handling package dependencies, conflicts, and installation order. +""" + +import json +import os +import subprocess +import tempfile +from typing import Dict, List, Optional, Any, Set, Tuple +from dataclasses import dataclass, asdict +from pathlib import Path +import urllib.parse +from datetime import datetime + +@dataclass +class PackageInfo: + """Represents package information and dependencies""" + name: str + version: str + architecture: str + depends: List[str] + recommends: List[str] + suggests: List[str] + conflicts: List[str] + breaks: List[str] + replaces: List[str] + provides: List[str] + essential: bool = False + priority: str = "optional" + +@dataclass +class DependencyResolution: + """Represents the result of dependency resolution""" + packages: List[str] + install_order: List[str] + conflicts: List[str] + missing: List[str] + circular_deps: List[str] + +class DebianPackageResolver: + """Resolves Debian package dependencies for composer builds""" + + def __init__(self, repository_manager=None): + self.repository_manager = repository_manager + self.package_cache = {} + self.dependency_graph = {} + self.conflict_cache = {} + + def resolve_package_dependencies(self, packages: List[str], suite: str = "bookworm", + architecture: str = "amd64", + include_recommends: bool = False) -> DependencyResolution: + """Resolve dependencies for a list of packages""" + try: + # Initialize resolution + resolved_packages = set() + install_order = [] + conflicts = [] + missing = [] + circular_deps = [] + + # Build dependency graph + self._build_dependency_graph(packages, suite, architecture) + + # Check for conflicts + conflicts = self._check_conflicts(packages) + + # Resolve dependencies + resolved_packages, install_order, missing, circular_deps = self._resolve_dependencies( + packages, include_recommends + ) + + return DependencyResolution( + packages=list(resolved_packages), + install_order=install_order, + conflicts=conflicts, + missing=missing, + circular_deps=circular_deps + ) + + except Exception as e: + print(f"Dependency resolution failed: {e}") + return DependencyResolution([], [], [], packages, []) + + def _build_dependency_graph(self, packages: List[str], suite: str, architecture: str): + """Build dependency graph for packages""" + self.dependency_graph = {} + + for package in packages: + if package not in self.dependency_graph: + self.dependency_graph[package] = { + 'deps': set(), + 'reverse_deps': set(), + 'visited': False, + 'installing': False + } + + # Get package dependencies + deps = self._get_package_dependencies(package, suite, architecture) + self.dependency_graph[package]['deps'] = deps + + # Add reverse dependencies + for dep in deps: + if dep not in self.dependency_graph: + self.dependency_graph[dep] = { + 'deps': set(), + 'reverse_deps': set(), + 'visited': False, + 'installing': False + } + self.dependency_graph[dep]['reverse_deps'].add(package) + + def _get_package_dependencies(self, package: str, suite: str, architecture: str) -> Set[str]: + """Get dependencies for a specific package""" + # This would typically query the Debian repository + # For now, return common dependencies based on package type + + common_deps = { + 'systemd': {'libsystemd0', 'libc6'}, + 'systemd-sysv': {'systemd'}, + 'dbus': {'libdbus-1-3', 'libc6'}, + 'udev': {'libudev1', 'libc6'}, + 'ostree': {'libostree-1-1', 'libc6', 'libglib2.0-0'}, + 'linux-image-amd64': {'linux-image-6.1.0-13-amd64', 'linux-firmware'}, + 'openssh-server': {'openssh-client', 'libc6', 'libssl3'}, + 'nginx': {'libc6', 'libssl3', 'libpcre3'}, + 'postgresql': {'libc6', 'libssl3', 'libpq5'} + } + + if package in common_deps: + return common_deps[package] + + # Return minimal dependencies for unknown packages + return {'libc6'} + + def _check_conflicts(self, packages: List[str]) -> List[str]: + """Check for package conflicts""" + conflicts = [] + + # Common conflicts + conflict_pairs = [ + ('systemd', 'sysvinit-core'), + ('systemd-sysv', 'sysvinit-core'), + ('lightdm', 'gdm3'), + ('nginx', 'apache2'), + ('postgresql', 'mysql-server') + ] + + for pkg1, pkg2 in conflict_pairs: + if pkg1 in packages and pkg2 in packages: + conflicts.append(f"{pkg1} conflicts with {pkg2}") + + return conflicts + + def _resolve_dependencies(self, packages: List[str], include_recommends: bool) -> Tuple[Set[str], List[str], List[str], List[str]]: + """Resolve dependencies using topological sort""" + resolved = set() + install_order = [] + missing = [] + circular_deps = [] + + # Reset visited flags + for pkg in self.dependency_graph: + self.dependency_graph[pkg]['visited'] = False + self.dependency_graph[pkg]['installing'] = False + + # Process each package + for package in packages: + if package not in resolved: + try: + self._visit_package(package, resolved, install_order, missing, circular_deps) + except Exception as e: + missing.append(package) + + return resolved, install_order, missing, circular_deps + + def _visit_package(self, package: str, resolved: Set[str], install_order: List[str], + missing: List[str], circular_deps: List[str]): + """Visit a package for dependency resolution (DFS)""" + if package not in self.dependency_graph: + missing.append(package) + return + + node = self.dependency_graph[package] + + if node['installing']: + circular_deps.append(package) + return + + if node['visited']: + return + + node['installing'] = True + + # Process dependencies first + for dep in node['deps']: + if dep not in resolved: + self._visit_package(dep, resolved, install_order, missing, circular_deps) + + node['installing'] = False + node['visited'] = True + + resolved.add(package) + install_order.append(package) + + def generate_apt_install_command(self, packages: List[str], + include_recommends: bool = False, + allow_unauthenticated: bool = False) -> List[str]: + """Generate apt install command for resolved packages""" + cmd = ['apt-get', '-y'] + + if not include_recommends: + cmd.append('--no-install-recommends') + + if allow_unauthenticated: + cmd.append('--allow-unauthenticated') + + cmd.extend(['install'] + packages) + return cmd + + def generate_debootstrap_command(self, suite: str, mirror: str, + components: List[str] = None, + variant: str = "minbase") -> List[str]: + """Generate debootstrap command for base system""" + if components is None: + components = ["main"] + + cmd = [ + 'debootstrap', + '--arch=amd64', + f'--variant={variant}', + '--components=' + ','.join(components), + suite, + '/target', + mirror + ] + + return cmd + + def validate_package_list(self, packages: List[str], suite: str = "bookworm") -> Dict[str, Any]: + """Validate a list of packages for a specific suite""" + validation_result = { + 'valid': True, + 'errors': [], + 'warnings': [], + 'suggestions': [] + } + + # Check for empty package list + if not packages: + validation_result['valid'] = False + validation_result['errors'].append("Package list is empty") + return validation_result + + # Check for duplicate packages + duplicates = [pkg for pkg in set(packages) if packages.count(pkg) > 1] + if duplicates: + validation_result['warnings'].append(f"Duplicate packages: {duplicates}") + + # Check for essential packages + essential_packages = ['systemd', 'systemd-sysv', 'dbus', 'udev'] + missing_essential = [pkg for pkg in essential_packages if pkg not in packages] + if missing_essential: + validation_result['suggestions'].append(f"Consider adding essential packages: {missing_essential}") + + # Check for conflicting packages + conflicts = self._check_conflicts(packages) + if conflicts: + validation_result['valid'] = False + validation_result['errors'].extend(conflicts) + + return validation_result + + def get_package_metadata(self, package: str, suite: str = "bookworm", + architecture: str = "amd64") -> Optional[PackageInfo]: + """Get metadata for a specific package""" + # This would typically query the Debian repository + # For now, return mock data + + mock_packages = { + 'systemd': PackageInfo( + name='systemd', + version='252.19-1', + architecture='amd64', + depends=['libsystemd0', 'libc6'], + recommends=['systemd-sysv'], + suggests=['systemd-container', 'systemd-resolved'], + conflicts=['sysvinit-core'], + breaks=[], + replaces=[], + provides=['systemd-sysv'], + essential=True, + priority='important' + ), + 'ostree': PackageInfo( + name='ostree', + version='2023.8-1', + architecture='amd64', + depends=['libostree-1-1', 'libc6', 'libglib2.0-0'], + recommends=[], + suggests=['ostree-tools'], + conflicts=[], + breaks=[], + replaces=[], + provides=[], + essential=False, + priority='optional' + ) + } + + return mock_packages.get(package) + + def export_dependency_graph(self, output_path: str) -> bool: + """Export dependency graph to file""" + try: + graph_data = { + 'packages': {}, + 'dependencies': {}, + 'exported_at': str(datetime.now()) + } + + for package, node in self.dependency_graph.items(): + graph_data['packages'][package] = { + 'deps': list(node['deps']), + 'reverse_deps': list(node['reverse_deps']) + } + + with open(output_path, 'w') as f: + json.dump(graph_data, f, indent=2) + + return True + + except Exception as e: + print(f"Failed to export dependency graph: {e}") + return False + +def main(): + """Example usage of Debian package resolver""" + print("Debian Package Resolver Example") + + # Create resolver + resolver = DebianPackageResolver() + + # Test package resolution + packages = ['systemd', 'ostree', 'openssh-server'] + + print(f"\nResolving dependencies for: {packages}") + resolution = resolver.resolve_package_dependencies(packages) + + print(f"Resolved packages: {len(resolution.packages)}") + print(f"Install order: {resolution.install_order[:5]}...") + print(f"Conflicts: {resolution.conflicts}") + print(f"Missing: {resolution.missing}") + + # Test validation + validation = resolver.validate_package_list(packages) + print(f"\nValidation: {'Valid' if validation['valid'] else 'Invalid'}") + if validation['errors']: + print(f"Errors: {validation['errors']}") + +if __name__ == '__main__': + main()