425 lines
16 KiB
Python
425 lines
16 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Debian Package Converter
|
|
|
|
This module handles package format conversions between RPM and Debian formats,
|
|
ensuring compatibility when migrating from Fedora to Debian systems.
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import tempfile
|
|
from typing import Dict, List, Optional, Any, Tuple
|
|
from dataclasses import dataclass, asdict
|
|
from pathlib import Path
|
|
import logging
|
|
import re
|
|
|
|
@dataclass
|
|
class PackageInfo:
|
|
"""Represents package information"""
|
|
name: str
|
|
version: str
|
|
architecture: str
|
|
format: str # "rpm" or "deb"
|
|
dependencies: List[str]
|
|
provides: List[str]
|
|
conflicts: List[str]
|
|
description: str
|
|
|
|
@dataclass
|
|
class ConversionResult:
|
|
"""Represents the result of a package conversion"""
|
|
success: bool
|
|
original_package: str
|
|
converted_package: Optional[str]
|
|
error: Optional[str]
|
|
warnings: List[str]
|
|
metadata: Dict[str, Any]
|
|
|
|
class DebianPackageConverter:
|
|
"""Converts packages between RPM and Debian formats"""
|
|
|
|
def __init__(self, config_dir: str = "./config/converter"):
|
|
self.config_dir = Path(config_dir)
|
|
self.config_dir.mkdir(parents=True, exist_ok=True)
|
|
self.logger = self._setup_logging()
|
|
|
|
# Package name mappings (RPM -> Debian)
|
|
self.package_mappings = self._load_package_mappings()
|
|
|
|
# Repository mappings
|
|
self.repo_mappings = {
|
|
"rpmfusion": "debian-backports",
|
|
"copr": "ppa",
|
|
"epel": "debian-backports"
|
|
}
|
|
|
|
def _setup_logging(self) -> logging.Logger:
|
|
"""Setup logging for the converter"""
|
|
logger = logging.getLogger('debian-package-converter')
|
|
logger.setLevel(logging.INFO)
|
|
|
|
if not logger.handlers:
|
|
handler = logging.FileHandler(self.config_dir / "converter.log")
|
|
formatter = logging.Formatter(
|
|
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
)
|
|
handler.setFormatter(formatter)
|
|
logger.addHandler(handler)
|
|
|
|
return logger
|
|
|
|
def _load_package_mappings(self) -> Dict[str, str]:
|
|
"""Load package name mappings from configuration"""
|
|
mappings_file = self.config_dir / "package_mappings.json"
|
|
|
|
if mappings_file.exists():
|
|
with open(mappings_file, 'r') as f:
|
|
return json.load(f)
|
|
else:
|
|
# Default mappings for common packages
|
|
default_mappings = {
|
|
# Development tools
|
|
"gcc": "gcc",
|
|
"gcc-c++": "g++",
|
|
"make": "make",
|
|
"cmake": "cmake",
|
|
"autoconf": "autoconf",
|
|
"automake": "automake",
|
|
"libtool": "libtool",
|
|
"pkg-config": "pkg-config",
|
|
|
|
# System tools
|
|
"systemd": "systemd",
|
|
"systemd-sysv": "systemd-sysv",
|
|
"udev": "udev",
|
|
"dbus": "dbus",
|
|
|
|
# Desktop environments
|
|
"gnome-shell": "gnome-shell",
|
|
"gnome-session": "gnome-session",
|
|
"gnome-control-center": "gnome-control-center",
|
|
"kde-workspace": "kde-plasma-workspace",
|
|
"kde-runtime": "kde-runtime",
|
|
|
|
# Common libraries
|
|
"glib2": "libglib2.0-0",
|
|
"glib2-devel": "libglib2.0-dev",
|
|
"gtk3": "libgtk-3-0",
|
|
"gtk3-devel": "libgtk-3-dev",
|
|
"qt5-qtbase": "qtbase5-dev",
|
|
"qt5-qtbase-devel": "qtbase5-dev",
|
|
|
|
# Network tools
|
|
"curl": "curl",
|
|
"wget": "wget",
|
|
"openssh": "openssh-client",
|
|
"openssh-server": "openssh-server",
|
|
|
|
# Text editors
|
|
"vim": "vim",
|
|
"emacs": "emacs",
|
|
"nano": "nano",
|
|
|
|
# Shells
|
|
"bash": "bash",
|
|
"zsh": "zsh",
|
|
"fish": "fish",
|
|
|
|
# Package managers
|
|
"dnf": "apt",
|
|
"yum": "apt",
|
|
"rpm": "dpkg"
|
|
}
|
|
|
|
# Save default mappings
|
|
with open(mappings_file, 'w') as f:
|
|
json.dump(default_mappings, f, indent=2)
|
|
|
|
return default_mappings
|
|
|
|
def convert_package_name(self, rpm_name: str) -> str:
|
|
"""Convert an RPM package name to its Debian equivalent"""
|
|
# Remove RPM-specific suffixes
|
|
clean_name = rpm_name.replace("-devel", "").replace("-debuginfo", "").replace("-doc", "")
|
|
|
|
# Check if we have a direct mapping
|
|
if clean_name in self.package_mappings:
|
|
return self.package_mappings[clean_name]
|
|
|
|
# Handle common patterns
|
|
if clean_name.endswith("-devel"):
|
|
base_name = clean_name[:-6]
|
|
if base_name in self.package_mappings:
|
|
return f"{self.package_mappings[base_name]}-dev"
|
|
|
|
# Handle library packages
|
|
if clean_name.startswith("lib") and clean_name.endswith("-devel"):
|
|
lib_name = clean_name[3:-6]
|
|
return f"lib{lib_name}-dev"
|
|
|
|
# Handle Python packages
|
|
if clean_name.startswith("python3-"):
|
|
return clean_name # Python packages often have the same names
|
|
|
|
# Handle Perl packages
|
|
if clean_name.startswith("perl-"):
|
|
perl_name = clean_name[5:]
|
|
return f"lib{perl_name}-perl"
|
|
|
|
# If no mapping found, return the original name
|
|
self.logger.warning(f"No mapping found for package: {rpm_name}")
|
|
return clean_name
|
|
|
|
def convert_repository(self, rpm_repo: str) -> str:
|
|
"""Convert an RPM repository to its Debian equivalent"""
|
|
# Handle COPR repositories
|
|
if rpm_repo.startswith("copr:"):
|
|
copr_parts = rpm_repo.split(":")
|
|
if len(copr_parts) >= 3:
|
|
user = copr_parts[1]
|
|
project = copr_parts[2]
|
|
return f"ppa:{user}/{project}"
|
|
|
|
# Handle RPM Fusion
|
|
if "rpmfusion" in rpm_repo:
|
|
return "debian-backports"
|
|
|
|
# Handle EPEL
|
|
if "epel" in rpm_repo:
|
|
return "debian-backports"
|
|
|
|
# If no mapping found, return the original
|
|
return rpm_repo
|
|
|
|
def convert_package_spec(self, rpm_spec: str) -> str:
|
|
"""Convert an RPM package specification to Debian format"""
|
|
# Handle URL specifications
|
|
if rpm_spec.startswith("http") and rpm_spec.endswith(".rpm"):
|
|
return rpm_spec.replace(".rpm", ".deb")
|
|
|
|
# Handle package names
|
|
if not rpm_spec.startswith("http"):
|
|
return self.convert_package_name(rpm_spec)
|
|
|
|
return rpm_spec
|
|
|
|
def convert_dependencies(self, rpm_deps: List[str]) -> List[str]:
|
|
"""Convert RPM dependencies to Debian equivalents"""
|
|
converted_deps = []
|
|
|
|
for dep in rpm_deps:
|
|
# Handle version constraints
|
|
if ">=" in dep:
|
|
parts = dep.split(">=")
|
|
pkg_name = self.convert_package_name(parts[0])
|
|
version = parts[1]
|
|
converted_deps.append(f"{pkg_name} (>= {version})")
|
|
elif "<=" in dep:
|
|
parts = dep.split("<=")
|
|
pkg_name = self.convert_package_name(parts[0])
|
|
version = parts[1]
|
|
converted_deps.append(f"{pkg_name} (<= {version})")
|
|
elif "=" in dep and not dep.startswith("="):
|
|
parts = dep.split("=")
|
|
pkg_name = self.convert_package_name(parts[0])
|
|
version = parts[1]
|
|
converted_deps.append(f"{pkg_name} (= {version})")
|
|
else:
|
|
converted_deps.append(self.convert_package_name(dep))
|
|
|
|
return converted_deps
|
|
|
|
def convert_group_to_task(self, rpm_group: str) -> str:
|
|
"""Convert an RPM group to a Debian task"""
|
|
group_mappings = {
|
|
"development-tools": "development-tools",
|
|
"system-tools": "system-tools",
|
|
"network-tools": "network-tools",
|
|
"graphical-internet": "graphical-internet",
|
|
"text-internet": "text-internet",
|
|
"office": "office",
|
|
"graphics": "graphics",
|
|
"sound-and-video": "sound-and-video",
|
|
"games": "games",
|
|
"education": "education",
|
|
"scientific": "scientific",
|
|
"documentation": "documentation"
|
|
}
|
|
|
|
return group_mappings.get(rpm_group, rpm_group)
|
|
|
|
def convert_module_config(self, rpm_config: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Convert an entire RPM module configuration to Debian format"""
|
|
deb_config = rpm_config.copy()
|
|
|
|
# Convert module type
|
|
if deb_config.get("type") == "dnf":
|
|
deb_config["type"] = "apt"
|
|
elif deb_config.get("type") == "rpm-ostree":
|
|
deb_config["type"] = "apt-ostree"
|
|
elif deb_config.get("type") == "mock":
|
|
deb_config["type"] = "deb-mock"
|
|
|
|
# Convert repositories
|
|
if "repos" in deb_config:
|
|
if "copr" in deb_config["repos"]:
|
|
deb_config["repos"]["ppa"] = []
|
|
for copr in deb_config["repos"]["copr"]:
|
|
ppa = self.convert_repository(f"copr:{copr}")
|
|
if ppa.startswith("ppa:"):
|
|
deb_config["repos"]["ppa"].append(ppa)
|
|
del deb_config["repos"]["copr"]
|
|
|
|
if "nonfree" in deb_config["repos"]:
|
|
if deb_config["repos"]["nonfree"] == "rpmfusion":
|
|
deb_config["repos"]["backports"] = True
|
|
del deb_config["repos"]["nonfree"]
|
|
|
|
# Convert package installations
|
|
if "install" in deb_config and "packages" in deb_config["install"]:
|
|
packages = deb_config["install"]["packages"]
|
|
converted_packages = []
|
|
|
|
for pkg in packages:
|
|
if isinstance(pkg, str):
|
|
converted_packages.append(self.convert_package_spec(pkg))
|
|
elif isinstance(pkg, dict) and "packages" in pkg:
|
|
converted_pkg = pkg.copy()
|
|
converted_pkg["packages"] = [self.convert_package_name(p) for p in pkg["packages"]]
|
|
converted_packages.append(converted_pkg)
|
|
else:
|
|
converted_packages.append(pkg)
|
|
|
|
deb_config["install"]["packages"] = converted_packages
|
|
|
|
# Convert package removals
|
|
if "remove" in deb_config and "packages" in deb_config["remove"]:
|
|
deb_config["remove"]["packages"] = [
|
|
self.convert_package_name(pkg) for pkg in deb_config["remove"]["packages"]
|
|
]
|
|
|
|
# Convert groups to tasks
|
|
if "group-install" in deb_config:
|
|
deb_config["task-install"] = {
|
|
"packages": [self.convert_group_to_task(group) for group in deb_config["group-install"]["packages"]],
|
|
"with-optional": deb_config["group-install"].get("with-optional", False)
|
|
}
|
|
del deb_config["group-install"]
|
|
|
|
if "group-remove" in deb_config:
|
|
deb_config["task-remove"] = {
|
|
"packages": [self.convert_group_to_task(group) for group in deb_config["group-remove"]["packages"]]
|
|
}
|
|
del deb_config["group-remove"]
|
|
|
|
return deb_config
|
|
|
|
def validate_debian_package(self, package_name: str) -> bool:
|
|
"""Validate if a Debian package name is valid"""
|
|
# Basic validation - Debian package names should be lowercase, no spaces
|
|
if not package_name or " " in package_name:
|
|
return False
|
|
|
|
# Check if it's a valid package name pattern
|
|
valid_pattern = re.compile(r'^[a-z0-9][a-z0-9+\-\.]+$')
|
|
return bool(valid_pattern.match(package_name))
|
|
|
|
def get_package_info(self, package_name: str, format_type: str = "deb") -> Optional[PackageInfo]:
|
|
"""Get information about a package"""
|
|
try:
|
|
if format_type == "deb":
|
|
# Use apt-cache for Debian packages
|
|
result = subprocess.run(
|
|
["apt-cache", "show", package_name],
|
|
capture_output=True,
|
|
text=True,
|
|
check=True
|
|
)
|
|
|
|
# Parse apt-cache output
|
|
info = self._parse_apt_cache_output(result.stdout)
|
|
return info
|
|
|
|
elif format_type == "rpm":
|
|
# Use rpm for RPM packages
|
|
result = subprocess.run(
|
|
["rpm", "-qi", package_name],
|
|
capture_output=True,
|
|
text=True,
|
|
check=True
|
|
)
|
|
|
|
# Parse rpm output
|
|
info = self._parse_rpm_output(result.stdout)
|
|
return info
|
|
|
|
except subprocess.CalledProcessError:
|
|
self.logger.warning(f"Package not found: {package_name}")
|
|
return None
|
|
except Exception as e:
|
|
self.logger.error(f"Error getting package info: {e}")
|
|
return None
|
|
|
|
def _parse_apt_cache_output(self, output: str) -> PackageInfo:
|
|
"""Parse apt-cache output to extract package information"""
|
|
lines = output.split('\n')
|
|
info = {
|
|
"name": "",
|
|
"version": "",
|
|
"architecture": "",
|
|
"format": "deb",
|
|
"dependencies": [],
|
|
"provides": [],
|
|
"conflicts": [],
|
|
"description": ""
|
|
}
|
|
|
|
for line in lines:
|
|
if line.startswith("Package:"):
|
|
info["name"] = line.split(":", 1)[1].strip()
|
|
elif line.startswith("Version:"):
|
|
info["version"] = line.split(":", 1)[1].strip()
|
|
elif line.startswith("Architecture:"):
|
|
info["architecture"] = line.split(":", 1)[1].strip()
|
|
elif line.startswith("Depends:"):
|
|
deps = line.split(":", 1)[1].strip()
|
|
info["dependencies"] = [d.strip() for d in deps.split(",")]
|
|
elif line.startswith("Provides:"):
|
|
provides = line.split(":", 1)[1].strip()
|
|
info["provides"] = [p.strip() for p in provides.split(",")]
|
|
elif line.startswith("Conflicts:"):
|
|
conflicts = line.split(":", 1)[1].strip()
|
|
info["conflicts"] = [c.strip() for c in conflicts.split(",")]
|
|
elif line.startswith("Description:"):
|
|
info["description"] = line.split(":", 1)[1].strip()
|
|
|
|
return PackageInfo(**info)
|
|
|
|
def _parse_rpm_output(self, output: str) -> PackageInfo:
|
|
"""Parse rpm output to extract package information"""
|
|
lines = output.split('\n')
|
|
info = {
|
|
"name": "",
|
|
"version": "",
|
|
"architecture": "",
|
|
"format": "rpm",
|
|
"dependencies": [],
|
|
"provides": [],
|
|
"conflicts": [],
|
|
"description": ""
|
|
}
|
|
|
|
for line in lines:
|
|
if line.startswith("Name"):
|
|
info["name"] = line.split(":", 1)[1].strip()
|
|
elif line.startswith("Version"):
|
|
info["version"] = line.split(":", 1)[1].strip()
|
|
elif line.startswith("Architecture"):
|
|
info["architecture"] = line.split(":", 1)[1].strip()
|
|
elif line.startswith("Summary"):
|
|
info["description"] = line.split(":", 1)[1].strip()
|
|
|
|
return PackageInfo(**info)
|