# pylint: disable=too-many-branches # pylint: disable=too-many-nested-blocks import itertools import os import os.path import tempfile import subprocess import json from datetime import datetime from typing import Dict, List, Any, Optional from osbuild.solver import ( DepsolveError, MarkingError, NoReposError, RepoError, SolverBase, modify_rootdir_path, read_keys, ) class APT(SolverBase): def __init__(self, request, persistdir, cache_dir, license_index_path=None): arch = request["arch"] releasever = request.get("releasever") proxy = request.get("proxy") arguments = request["arguments"] repos = arguments.get("repos", []) root_dir = arguments.get("root_dir") self.arch = arch self.releasever = releasever self.root_dir = root_dir self.cache_dir = cache_dir self.persistdir = persistdir self.proxy = proxy # APT configuration self.apt_config = { "APT::Architecture": arch, "APT::Default-Release": releasever or "trixie", "APT::Get::Assume-Yes": "true", "APT::Get::AllowUnauthenticated": "false", "APT::Get::Fix-Broken": "true", "APT::Get::Show-Upgraded": "true", "APT::Get::Show-User-Simulation-Note": "false", "APT::Install-Recommends": "false", "APT::Install-Suggests": "false", "APT::Cache::ShowFull": "true", "Dir::Etc::Trusted": "/etc/apt/trusted.gpg", "Dir::Etc::TrustedParts": "/etc/apt/trusted.gpg.d/", } # Set up proxy if provided if proxy: self.apt_config.update({ "Acquire::http::Proxy": proxy, "Acquire::https::Proxy": proxy, "Acquire::ftp::Proxy": proxy, }) # Repository configuration self.repos = [] for repo in repos: self._add_repository(repo) if not self.repos: raise NoReposError("No repositories configured") def _add_repository(self, repo_config): """Add a repository to the APT configuration.""" repo = { "name": repo_config.get("name", "unknown"), "baseurl": repo_config.get("baseurl", ""), "enabled": repo_config.get("enabled", True), "gpgcheck": repo_config.get("gpgcheck", True), "gpgkey": repo_config.get("gpgkey", []), "priority": repo_config.get("priority", 500), "components": repo_config.get("components", ["main"]), "architectures": repo_config.get("architectures", [self.arch]), } if not repo["baseurl"]: raise RepoError(f"Repository {repo['name']} has no baseurl") # Add GPG keys if specified if repo["gpgcheck"] and repo["gpgkey"]: try: keys = read_keys(repo["gpgkey"], self.root_dir) # In a real implementation, we would add these keys to the keyring # For now, we'll just validate they exist for key in keys: if not key.strip(): raise RepoError(f"Empty GPG key for repository {repo['name']}") except Exception as e: raise RepoError(f"Failed to read GPG keys for repository {repo['name']}: {e}") from e self.repos.append(repo) def _run_apt_command(self, command, args=None, env=None): """Run an APT command with proper configuration.""" if args is None: args = [] # Set up environment cmd_env = os.environ.copy() if env: cmd_env.update(env) # Add APT configuration apt_opts = [] for key, value in self.apt_config.items(): apt_opts.extend(["-o", f"{key}={value}"]) # Add root directory if specified if self.root_dir: apt_opts.extend(["-o", f"Dir={self.root_dir}"]) # Build command full_command = ["apt-get"] + apt_opts + [command] + args try: result = subprocess.run( full_command, capture_output=True, text=True, check=True, env=cmd_env, cwd=self.root_dir or "/" ) return result except subprocess.CalledProcessError as e: raise DepsolveError(f"APT command failed: {e.stderr}") from e def _run_apt_cache_command(self, command, args=None): """Run an apt-cache command with proper configuration.""" if args is None: args = [] # Set up APT configuration apt_opts = [] for key, value in self.apt_config.items(): apt_opts.extend(["-o", f"{key}={value}"]) # Add root directory if specified if self.root_dir: apt_opts.extend(["-o", f"Dir={self.root_dir}"]) # Build command full_command = ["apt-cache"] + apt_opts + [command] + args try: result = subprocess.run( full_command, capture_output=True, text=True, check=True, cwd=self.root_dir or "/" ) return result except subprocess.CalledProcessError as e: raise DepsolveError(f"apt-cache command failed: {e.stderr}") from e def _update_package_lists(self): """Update package lists from repositories.""" try: self._run_apt_command("update") except DepsolveError as e: raise RepoError(f"Failed to update package lists: {e}") from e def dump(self): """Dump the current APT configuration and package state.""" try: # Get package list result = self._run_apt_cache_command("pkgnames") packages = result.stdout.strip().split('\n') if result.stdout.strip() else [] # Get repository information repo_info = [] for repo in self.repos: repo_info.append({ "name": repo["name"], "baseurl": repo["baseurl"], "enabled": repo["enabled"], "priority": repo["priority"], "components": repo["components"], "architectures": repo["architectures"], }) return { "packages": packages, "repositories": repo_info, "architecture": self.arch, "releasever": self.releasever, "root_dir": self.root_dir, "timestamp": datetime.now().isoformat(), } except Exception as e: raise DepsolveError(f"Failed to dump APT state: {e}") from e def depsolve(self, arguments): """Resolve dependencies for the given packages.""" packages = arguments.get("packages", []) exclude_packages = arguments.get("exclude_packages", []) allow_erasing = arguments.get("allow_erasing", False) best = arguments.get("best", True) clean_requirements_on_remove = arguments.get("clean_requirements_on_remove", True) if not packages: return [] try: # Update package lists first self._update_package_lists() # Build apt-get command arguments apt_args = [] if best: apt_args.append("--fix-broken") if allow_erasing: apt_args.append("--allow-remove-essential") if clean_requirements_on_remove: apt_args.append("--auto-remove") # Add packages to install apt_args.extend(packages) # Add packages to exclude for pkg in exclude_packages: apt_args.extend(["--exclude", pkg]) # Run dependency resolution result = self._run_apt_command("install", apt_args, env={"DEBIAN_FRONTEND": "noninteractive"}) # Parse the output to get resolved packages resolved_packages = self._parse_apt_output(result.stdout) return resolved_packages except Exception as e: raise DepsolveError(f"Dependency resolution failed: {e}") from e def _parse_apt_output(self, output): """Parse apt-get output to extract resolved package information.""" packages = [] lines = output.split('\n') for line in lines: line = line.strip() if line.startswith(('Inst', 'Upgrading', 'Removing')): # Parse package installation/upgrade/removal lines parts = line.split() if len(parts) >= 2: action = parts[0] package_info = parts[1] # Extract package name and version if ':' in package_info: pkg_name, pkg_version = package_info.split(':', 1) else: pkg_name = package_info pkg_version = None packages.append({ "name": pkg_name, "version": pkg_version, "action": action, "arch": self.arch, }) return packages def search(self, args): """Search for packages matching the given criteria.""" query = args.get("query", "") match_type = args.get("match_type", "name") limit = args.get("limit", 100) if not query: return [] try: # Update package lists first self._update_package_lists() # Build search command search_args = [] if match_type == "name": search_args.extend(["--names-only", query]) elif match_type == "description": search_args.extend(["--full", query]) else: search_args.append(query) # Run search result = self._run_apt_cache_command("search", search_args) # Parse results packages = self._parse_search_output(result.stdout, limit) return packages except Exception as e: raise DepsolveError(f"Package search failed: {e}") from e def _parse_search_output(self, output, limit): """Parse apt-cache search output to extract package information.""" packages = [] lines = output.split('\n') for line in lines: if not line.strip() or len(packages) >= limit: break # Parse package name and description if ' - ' in line: pkg_name, description = line.split(' - ', 1) packages.append({ "name": pkg_name.strip(), "description": description.strip(), "arch": self.arch, }) return packages def get_package_info(self, package_name): """Get detailed information about a specific package.""" try: result = self._run_apt_cache_command("show", [package_name]) return self._parse_package_info(result.stdout) except Exception as e: raise DepsolveError(f"Failed to get package info for {package_name}: {e}") from e def _parse_package_info(self, output): """Parse apt-cache show output to extract package information.""" info = {} lines = output.split('\n') for line in lines: line = line.strip() if ':' in line: key, value = line.split(':', 1) key = key.strip().lower().replace(' ', '_') value = value.strip() info[key] = value return info def get_dependencies(self, package_name): """Get dependencies for a specific package.""" try: result = self._run_apt_cache_command("depends", [package_name]) return self._parse_dependencies(result.stdout) except Exception as e: raise DepsolveError(f"Failed to get dependencies for {package_name}: {e}") from e def _parse_dependencies(self, output): """Parse apt-cache depends output to extract dependency information.""" dependencies = { "depends": [], "recommends": [], "suggests": [], "conflicts": [], "breaks": [], "replaces": [], } lines = output.split('\n') current_type = None for line in lines: line = line.strip() if not line: continue if line.startswith('Depends:'): current_type = "depends" elif line.startswith('Recommends:'): current_type = "recommends" elif line.startswith('Suggests:'): current_type = "suggests" elif line.startswith('Conflicts:'): current_type = "conflicts" elif line.startswith('Breaks:'): current_type = "breaks" elif line.startswith('Replaces:'): current_type = "replaces" elif current_type and line.startswith(' '): # Continuation line dep = line.strip() if dep: dependencies[current_type].append(dep) elif current_type and not line.startswith(' '): # New dependency type current_type = None return dependencies