particle-os-tools/src/apt-ostree.py/core/package_manager.py

466 lines
No EOL
17 KiB
Python

"""
High-level APT operations for apt-ostree.
This module provides high-level package management operations using python-apt,
including package installation, removal, upgrades, and dependency resolution.
"""
import logging
from typing import List, Dict, Any, Callable, Optional
import apt
import apt_pkg
from apt.cache import Cache
from apt.progress.base import AcquireProgress, InstallProgress
from .exceptions import PackageManagerError
logger = logging.getLogger(__name__)
class PackageManager:
"""
High-level APT package manager for apt-ostree.
This class provides high-level operations for package management using
python-apt, including package installation, removal, upgrades, and
dependency resolution.
"""
def __init__(self):
"""Initialize the package manager."""
self._cache: Optional[Cache] = None
self._acquire_progress: Optional[AcquireProgress] = None
self._install_progress: Optional[InstallProgress] = None
@property
def cache(self) -> Cache:
"""Get the APT cache, initializing it if necessary."""
if self._cache is None:
try:
self._cache = Cache()
except Exception as e:
raise PackageManagerError(f"Failed to initialize APT cache: {e}") from e
return self._cache
def update_package_lists(self, progress_callback: Optional[Callable] = None) -> bool:
"""
Refresh package lists from repositories.
Args:
progress_callback: Optional callback for progress updates
Returns:
True if successful, False otherwise
Raises:
PackageManagerError: If package list update fails
"""
try:
logger.info("Updating package lists")
if progress_callback:
progress_callback("Updating package lists...", 0)
# Update the cache
self.cache.update()
if progress_callback:
progress_callback("Package lists updated successfully", 100)
logger.info("Package lists updated successfully")
return True
except Exception as e:
error_msg = f"Failed to update package lists: {e}"
logger.error(error_msg)
if progress_callback:
progress_callback(f"Error: {error_msg}", -1)
raise PackageManagerError(error_msg) from e
def install_packages(self, packages: List[str], live_install: bool = False,
progress_callback: Optional[Callable] = None) -> Dict[str, Any]:
"""
Install packages using APT.
Args:
packages: List of package names to install
live_install: Whether to perform live installation
progress_callback: Optional callback for progress updates
Returns:
Dictionary with installation results
Raises:
PackageManagerError: If package installation fails
"""
try:
logger.info(f"Installing packages: {packages}")
if progress_callback:
progress_callback(f"Installing packages: {', '.join(packages)}", 0)
# Mark packages for installation
for package in packages:
if package in self.cache:
pkg = self.cache[package]
if pkg.is_installed:
logger.info(f"Package {package} is already installed")
continue
pkg.mark_install()
logger.info(f"Marked {package} for installation")
else:
raise PackageManagerError(f"Package {package} not found in repositories")
# Commit changes
if progress_callback:
progress_callback("Committing package changes...", 50)
self.cache.commit()
if progress_callback:
progress_callback("Package installation completed", 100)
logger.info("Package installation completed successfully")
return {
"success": True,
"packages": packages,
"message": "Packages installed successfully"
}
except Exception as e:
error_msg = f"Failed to install packages {packages}: {e}"
logger.error(error_msg)
if progress_callback:
progress_callback(f"Error: {error_msg}", -1)
raise PackageManagerError(error_msg) from e
def remove_packages(self, packages: List[str], live_remove: bool = False,
progress_callback: Optional[Callable] = None) -> Dict[str, Any]:
"""
Remove packages using APT.
Args:
packages: List of package names to remove
live_remove: Whether to perform live removal
progress_callback: Optional callback for progress updates
Returns:
Dictionary with removal results
Raises:
PackageManagerError: If package removal fails
"""
try:
logger.info(f"Removing packages: {packages}")
if progress_callback:
progress_callback(f"Removing packages: {', '.join(packages)}", 0)
# Mark packages for removal
for package in packages:
if package in self.cache:
pkg = self.cache[package]
if not pkg.is_installed:
logger.info(f"Package {package} is not installed")
continue
pkg.mark_delete()
logger.info(f"Marked {package} for removal")
else:
raise PackageManagerError(f"Package {package} not found in cache")
# Commit changes
if progress_callback:
progress_callback("Committing package changes...", 50)
self.cache.commit()
if progress_callback:
progress_callback("Package removal completed", 100)
logger.info("Package removal completed successfully")
return {
"success": True,
"packages": packages,
"message": "Packages removed successfully"
}
except Exception as e:
error_msg = f"Failed to remove packages {packages}: {e}"
logger.error(error_msg)
if progress_callback:
progress_callback(f"Error: {error_msg}", -1)
raise PackageManagerError(error_msg) from e
def upgrade_system(self, progress_callback: Optional[Callable] = None) -> Dict[str, Any]:
"""
Perform full system upgrade.
Args:
progress_callback: Optional callback for progress updates
Returns:
Dictionary with upgrade results
Raises:
PackageManagerError: If system upgrade fails
"""
try:
logger.info("Starting system upgrade")
if progress_callback:
progress_callback("Starting system upgrade...", 0)
# Update package lists first
self.update_package_lists(progress_callback)
if progress_callback:
progress_callback("Checking for upgrades...", 25)
# Mark all packages for upgrade
self.cache.upgrade()
if progress_callback:
progress_callback("Committing upgrades...", 75)
# Commit changes
self.cache.commit()
if progress_callback:
progress_callback("System upgrade completed", 100)
logger.info("System upgrade completed successfully")
return {
"success": True,
"message": "System upgraded successfully"
}
except Exception as e:
error_msg = f"Failed to upgrade system: {e}"
logger.error(error_msg)
if progress_callback:
progress_callback(f"Error: {error_msg}", -1)
raise PackageManagerError(error_msg) from e
def search_packages(self, query: str) -> List[Dict[str, Any]]:
"""
Search for packages matching the query.
Args:
query: Search query string
Returns:
List of package information dictionaries
Raises:
PackageManagerError: If package search fails
"""
try:
logger.info(f"Searching for packages: {query}")
results = []
for pkg in self.cache:
if query.lower() in pkg.name.lower() or query.lower() in pkg.get_fullname().lower():
results.append({
"name": pkg.name,
"version": pkg.installed.version if pkg.is_installed else pkg.candidate.version if pkg.candidate else None,
"description": pkg.installed.summary if pkg.is_installed else pkg.candidate.summary if pkg.candidate else None,
"installed": pkg.is_installed
})
logger.info(f"Found {len(results)} packages matching '{query}'")
return results
except Exception as e:
error_msg = f"Failed to search packages: {e}"
logger.error(error_msg)
raise PackageManagerError(error_msg) from e
def get_installed_packages(self) -> List[Dict[str, Any]]:
"""
Get list of currently installed packages.
Returns:
List of installed package information dictionaries
Raises:
PackageManagerError: If package listing fails
"""
try:
logger.info("Getting list of installed packages")
installed = []
for pkg in self.cache:
if pkg.is_installed:
installed.append({
"name": pkg.name,
"version": pkg.installed.version,
"description": pkg.installed.summary,
"architecture": pkg.installed.architecture
})
logger.info(f"Found {len(installed)} installed packages")
return installed
except Exception as e:
error_msg = f"Failed to get installed packages: {e}"
logger.error(error_msg)
raise PackageManagerError(error_msg) from e
def resolve_dependencies(self, packages: List[str]) -> Dict[str, Any]:
"""
Analyze dependencies for a set of packages.
Args:
packages: List of package names to analyze
Returns:
Dictionary with dependency analysis results
Raises:
PackageManagerError: If dependency resolution fails
"""
try:
logger.info(f"Resolving dependencies for packages: {packages}")
dependencies = {}
conflicts = {}
for package in packages:
if package in self.cache:
pkg = self.cache[package]
if pkg.candidate:
# Get dependencies
deps = []
for dep in pkg.candidate.dependencies:
deps.append({
"name": dep.name,
"relation": str(dep.relation),
"version": str(dep.version) if dep.version else None
})
dependencies[package] = deps
# Get conflicts
conflicts_list = []
for conflict in pkg.candidate.conflicts:
conflicts_list.append({
"name": conflict.name,
"relation": str(conflict.relation),
"version": str(conflict.version) if conflict.version else None
})
if conflicts_list:
conflicts[package] = conflicts_list
else:
raise PackageManagerError(f"Package {package} not found in repositories")
return {
"dependencies": dependencies,
"conflicts": conflicts
}
except Exception as e:
error_msg = f"Failed to resolve dependencies: {e}"
logger.error(error_msg)
raise PackageManagerError(error_msg) from e
def get_package_info(self, package: str) -> Dict[str, Any]:
"""
Get detailed information about a package.
Args:
package: Package name
Returns:
Dictionary with package information
Raises:
PackageManagerError: If package info retrieval fails
"""
try:
logger.info(f"Getting package info for: {package}")
if package not in self.cache:
raise PackageManagerError(f"Package {package} not found in repositories")
pkg = self.cache[package]
info = {
"name": pkg.name,
"installed": pkg.is_installed,
"candidate": pkg.candidate is not None
}
if pkg.is_installed:
info.update({
"installed_version": pkg.installed.version,
"installed_description": pkg.installed.summary,
"installed_architecture": pkg.installed.architecture,
"installed_size": pkg.installed.size,
"installed_section": pkg.installed.section
})
if pkg.candidate:
info.update({
"candidate_version": pkg.candidate.version,
"candidate_description": pkg.candidate.summary,
"candidate_architecture": pkg.candidate.architecture,
"candidate_size": pkg.candidate.size,
"candidate_section": pkg.candidate.section
})
return info
except Exception as e:
error_msg = f"Failed to get package info for {package}: {e}"
logger.error(error_msg)
raise PackageManagerError(error_msg) from e
def get_package_architecture(self, package: str) -> str:
"""
Get the architecture of a package.
Args:
package: Package name
Returns:
Package architecture string
Raises:
PackageManagerError: If package architecture retrieval fails
"""
try:
info = self.get_package_info(package)
if info["installed"]:
return info["installed_architecture"]
elif info["candidate"]:
return info["candidate_architecture"]
else:
raise PackageManagerError(f"No version available for package {package}")
except Exception as e:
error_msg = f"Failed to get package architecture for {package}: {e}"
logger.error(error_msg)
raise PackageManagerError(error_msg) from e
def get_package_version(self, package: str) -> str:
"""
Get the version of a package.
Args:
package: Package name
Returns:
Package version string
Raises:
PackageManagerError: If package version retrieval fails
"""
try:
info = self.get_package_info(package)
if info["installed"]:
return info["installed_version"]
elif info["candidate"]:
return info["candidate_version"]
else:
raise PackageManagerError(f"No version available for package {package}")
except Exception as e:
error_msg = f"Failed to get package version for {package}: {e}"
logger.error(error_msg)
raise PackageManagerError(error_msg) from e