deb-mock/deb_mock/core.py
2025-08-03 22:16:04 +00:00

482 lines
No EOL
18 KiB
Python

"""
Core DebMock class for orchestrating the build process
"""
import os
import json
import shutil
from pathlib import Path
from typing import Dict, Any, Optional, List
from .config import Config
from .chroot import ChrootManager
from .sbuild import SbuildWrapper
from .metadata import MetadataManager
from .cache import CacheManager
from .exceptions import DebMockError, BuildError, ChrootError, SbuildError
class DebMock:
"""Main DebMock class for orchestrating package builds"""
def __init__(self, config: Config):
self.config = config
self.chroot_manager = ChrootManager(config)
self.sbuild_wrapper = SbuildWrapper(config)
self.metadata_manager = MetadataManager(config)
self.cache_manager = CacheManager(config)
# Validate configuration
self.config.validate()
# Setup caches
self._setup_caches()
def _setup_caches(self) -> None:
"""Setup cache directories and ccache"""
try:
# Setup ccache if enabled
if self.config.use_ccache:
self.cache_manager.setup_ccache()
except Exception as e:
# Log warning but continue
print(f"Warning: Failed to setup caches: {e}")
def build(self, source_package: str, **kwargs) -> Dict[str, Any]:
"""Build a Debian source package in an isolated environment"""
# Ensure chroot exists
chroot_name = kwargs.get('chroot_name', self.config.chroot_name)
chroot_path = self.config.get_chroot_path()
# Try to restore from cache first
if not self.chroot_manager.chroot_exists(chroot_name):
if not self.cache_manager.restore_root_cache(chroot_path):
self.chroot_manager.create_chroot(chroot_name)
# Check build dependencies
deps_check = self.sbuild_wrapper.check_dependencies(source_package, chroot_name)
if not deps_check['satisfied']:
# Try to install missing dependencies
if deps_check['missing']:
self.sbuild_wrapper.install_build_dependencies(deps_check['missing'], chroot_name)
# Setup build environment
build_env = self.config.setup_build_environment()
# Build the package
build_result = self.sbuild_wrapper.build_package(
source_package,
chroot_name,
build_env=build_env,
**kwargs
)
# Create cache after successful build
if build_result.get('success', False):
self.cache_manager.create_root_cache(chroot_path)
# Capture and store metadata
metadata = self._capture_build_metadata(build_result, source_package)
self.metadata_manager.store_metadata(metadata)
# Clean up chroot if not keeping it
if not kwargs.get('keep_chroot', self.config.keep_chroot):
self.chroot_manager.clean_chroot(chroot_name)
return build_result
def build_chain(self, source_packages: List[str], **kwargs) -> List[Dict[str, Any]]:
"""Build a chain of packages that depend on each other (similar to Mock's --chain)"""
results = []
chroot_name = kwargs.get('chroot_name', self.config.chroot_name)
chroot_path = self.config.get_chroot_path()
# Try to restore from cache first
if not self.chroot_manager.chroot_exists(chroot_name):
if not self.cache_manager.restore_root_cache(chroot_path):
self.chroot_manager.create_chroot(chroot_name)
# Setup build environment
build_env = self.config.setup_build_environment()
for i, source_package in enumerate(source_packages):
try:
# Build the package
result = self.sbuild_wrapper.build_package(
source_package,
chroot_name,
build_env=build_env,
**kwargs
)
results.append({
'package': source_package,
'success': True,
'result': result,
'order': i + 1
})
# Install the built package in the chroot for subsequent builds
if result.get('artifacts'):
self._install_built_package(result['artifacts'], chroot_name)
except Exception as e:
results.append({
'package': source_package,
'success': False,
'error': str(e),
'order': i + 1
})
# Stop chain on failure unless continue_on_failure is specified
if not kwargs.get('continue_on_failure', False):
break
# Create cache after successful chain build
if any(r['success'] for r in results):
self.cache_manager.create_root_cache(chroot_path)
return results
def _install_built_package(self, artifacts: List[str], chroot_name: str) -> None:
"""Install a built package in the chroot for chain building"""
# Find .deb files in artifacts
deb_files = [art for art in artifacts if art.endswith('.deb')]
if not deb_files:
return
# Copy .deb files to chroot and install them
for deb_file in deb_files:
try:
# Copy to chroot
chroot_deb_path = f"/tmp/{os.path.basename(deb_file)}"
self.chroot_manager.copy_to_chroot(deb_file, chroot_deb_path, chroot_name)
# Install in chroot
self.chroot_manager.execute_in_chroot(
chroot_name,
['dpkg', '-i', chroot_deb_path],
capture_output=False
)
# Clean up
self.chroot_manager.execute_in_chroot(
chroot_name,
['rm', '-f', chroot_deb_path],
capture_output=False
)
except Exception as e:
# Log warning but continue
print(f"Warning: Failed to install {deb_file} in chroot: {e}")
def init_chroot(self, chroot_name: str, arch: str = None, suite: str = None) -> None:
"""Initialize a new chroot environment"""
self.chroot_manager.create_chroot(chroot_name, arch, suite)
# Create cache after successful chroot creation
chroot_path = os.path.join(self.config.chroot_dir, chroot_name)
self.cache_manager.create_root_cache(chroot_path)
def clean_chroot(self, chroot_name: str) -> None:
"""Clean up a chroot environment"""
self.chroot_manager.clean_chroot(chroot_name)
def list_chroots(self) -> list:
"""List available chroot environments"""
return self.chroot_manager.list_chroots()
def update_chroot(self, chroot_name: str) -> None:
"""Update packages in a chroot environment"""
self.chroot_manager.update_chroot(chroot_name)
# Update cache after successful update
chroot_path = os.path.join(self.config.chroot_dir, chroot_name)
self.cache_manager.create_root_cache(chroot_path)
def get_chroot_info(self, chroot_name: str) -> dict:
"""Get information about a chroot environment"""
return self.chroot_manager.get_chroot_info(chroot_name)
def shell(self, chroot_name: str = None) -> None:
"""Open a shell in the chroot environment (similar to Mock's --shell)"""
if chroot_name is None:
chroot_name = self.config.chroot_name
if not self.chroot_manager.chroot_exists(chroot_name):
raise ChrootError(f"Chroot '{chroot_name}' does not exist")
# Execute shell in chroot
self.chroot_manager.execute_in_chroot(
chroot_name,
['/bin/bash'],
capture_output=False
)
def copyout(self, source_path: str, dest_path: str, chroot_name: str = None) -> None:
"""Copy files from chroot to host (similar to Mock's --copyout)"""
if chroot_name is None:
chroot_name = self.config.chroot_name
self.chroot_manager.copy_from_chroot(source_path, dest_path, chroot_name)
def copyin(self, source_path: str, dest_path: str, chroot_name: str = None) -> None:
"""Copy files from host to chroot (similar to Mock's --copyin)"""
if chroot_name is None:
chroot_name = self.config.chroot_name
self.chroot_manager.copy_to_chroot(source_path, dest_path, chroot_name)
def cleanup_caches(self) -> Dict[str, int]:
"""Clean up old cache files (similar to Mock's cache management)"""
return self.cache_manager.cleanup_old_caches()
def get_cache_stats(self) -> Dict[str, Any]:
"""Get cache statistics"""
return self.cache_manager.get_cache_stats()
def _capture_build_metadata(self, build_result: Dict[str, Any], source_package: str) -> Dict[str, Any]:
"""Capture comprehensive build metadata"""
metadata = {
'source_package': source_package,
'build_result': build_result,
'config': self.config.to_dict(),
'artifacts': build_result.get('artifacts', []),
'build_metadata': build_result.get('metadata', {}),
'timestamp': self._get_timestamp(),
'build_success': build_result.get('success', False),
'cache_info': self.get_cache_stats()
}
# Add artifact details
metadata['artifact_details'] = self._get_artifact_details(build_result.get('artifacts', []))
return metadata
def _get_timestamp(self) -> str:
"""Get current timestamp"""
from datetime import datetime
return datetime.now().isoformat()
def _get_artifact_details(self, artifacts: list) -> list:
"""Get detailed information about build artifacts"""
details = []
for artifact_path in artifacts:
if os.path.exists(artifact_path):
stat = os.stat(artifact_path)
details.append({
'path': artifact_path,
'name': os.path.basename(artifact_path),
'size': stat.st_size,
'modified': stat.st_mtime,
'type': self._get_artifact_type(artifact_path)
})
return details
def _get_artifact_type(self, artifact_path: str) -> str:
"""Determine the type of build artifact"""
ext = Path(artifact_path).suffix.lower()
if ext == '.deb':
return 'deb_package'
elif ext == '.changes':
return 'changes_file'
elif ext == '.buildinfo':
return 'buildinfo_file'
elif ext == '.dsc':
return 'source_package'
else:
return 'other'
def verify_reproducible_build(self, source_package: str, **kwargs) -> Dict[str, Any]:
"""Verify that a build is reproducible by building twice and comparing results"""
# First build
result1 = self.build(source_package, **kwargs)
# Clean chroot for second build
chroot_name = kwargs.get('chroot_name', self.config.chroot_name)
if self.chroot_manager.chroot_exists(chroot_name):
self.chroot_manager.clean_chroot(chroot_name)
# Second build
result2 = self.build(source_package, **kwargs)
# Compare results
comparison = self._compare_build_results(result1, result2)
return {
'reproducible': comparison['identical'],
'first_build': result1,
'second_build': result2,
'comparison': comparison
}
def _compare_build_results(self, result1: Dict[str, Any], result2: Dict[str, Any]) -> Dict[str, Any]:
"""Compare two build results for reproducibility"""
comparison = {
'identical': True,
'differences': [],
'artifact_comparison': {}
}
# Compare artifacts
artifacts1 = set(result1.get('artifacts', []))
artifacts2 = set(result2.get('artifacts', []))
if artifacts1 != artifacts2:
comparison['identical'] = False
comparison['differences'].append('Different artifacts produced')
# Compare individual artifacts
common_artifacts = artifacts1.intersection(artifacts2)
for artifact in common_artifacts:
if os.path.exists(artifact):
# Compare file hashes
hash1 = self._get_file_hash(artifact)
hash2 = self._get_file_hash(artifact)
comparison['artifact_comparison'][artifact] = {
'identical': hash1 == hash2,
'hash1': hash1,
'hash2': hash2
}
if hash1 != hash2:
comparison['identical'] = False
comparison['differences'].append(f'Artifact {artifact} differs')
return comparison
def _get_file_hash(self, file_path: str) -> str:
"""Get SHA256 hash of a file"""
import hashlib
hash_sha256 = hashlib.sha256()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_sha256.update(chunk)
return hash_sha256.hexdigest()
def get_build_history(self) -> list:
"""Get build history from metadata store"""
return self.metadata_manager.get_build_history()
def get_build_info(self, build_id: str) -> Optional[Dict[str, Any]]:
"""Get information about a specific build"""
return self.metadata_manager.get_build_info(build_id)
def install_dependencies(self, source_package: str) -> Dict[str, Any]:
"""Install build dependencies for a source package"""
chroot_name = self.config.chroot_name
# Ensure chroot exists
if not self.chroot_manager.chroot_exists(chroot_name):
self.chroot_manager.create_chroot(chroot_name)
# Check and install dependencies
deps_check = self.sbuild_wrapper.check_dependencies(source_package, chroot_name)
if deps_check['missing']:
result = self.sbuild_wrapper.install_build_dependencies(deps_check['missing'], chroot_name)
return {
'success': True,
'installed': deps_check['missing'],
'details': result
}
else:
return {
'success': True,
'installed': [],
'message': 'All dependencies already satisfied'
}
def install_packages(self, packages: List[str]) -> Dict[str, Any]:
"""Install packages in the chroot environment"""
chroot_name = self.config.chroot_name
# Ensure chroot exists
if not self.chroot_manager.chroot_exists(chroot_name):
self.chroot_manager.create_chroot(chroot_name)
# Install packages using APT
result = self.chroot_manager.execute_in_chroot(
chroot_name,
f"{self.config.apt_install_command} {' '.join(packages)}",
as_root=True
)
return {
'success': result['returncode'] == 0,
'installed': packages,
'output': result['stdout'],
'error': result['stderr'] if result['returncode'] != 0 else None
}
def update_packages(self, packages: List[str] = None) -> Dict[str, Any]:
"""Update packages in the chroot environment"""
chroot_name = self.config.chroot_name
# Ensure chroot exists
if not self.chroot_manager.chroot_exists(chroot_name):
self.chroot_manager.create_chroot(chroot_name)
if packages:
# Update specific packages
cmd = f"{self.config.apt_command} install --only-upgrade {' '.join(packages)}"
else:
# Update all packages
cmd = f"{self.config.apt_command} update && {self.config.apt_command} upgrade -y"
result = self.chroot_manager.execute_in_chroot(chroot_name, cmd, as_root=True)
return {
'success': result['returncode'] == 0,
'updated': packages if packages else 'all',
'output': result['stdout'],
'error': result['stderr'] if result['returncode'] != 0 else None
}
def remove_packages(self, packages: List[str]) -> Dict[str, Any]:
"""Remove packages from the chroot environment"""
chroot_name = self.config.chroot_name
# Ensure chroot exists
if not self.chroot_manager.chroot_exists(chroot_name):
self.chroot_manager.create_chroot(chroot_name)
# Remove packages using APT
cmd = f"{self.config.apt_command} remove -y {' '.join(packages)}"
result = self.chroot_manager.execute_in_chroot(chroot_name, cmd, as_root=True)
return {
'success': result['returncode'] == 0,
'removed': packages,
'output': result['stdout'],
'error': result['stderr'] if result['returncode'] != 0 else None
}
def execute_apt_command(self, command: str) -> Dict[str, Any]:
"""Execute APT command in the chroot environment"""
chroot_name = self.config.chroot_name
# Ensure chroot exists
if not self.chroot_manager.chroot_exists(chroot_name):
self.chroot_manager.create_chroot(chroot_name)
# Execute APT command
cmd = f"{self.config.apt_command} {command}"
result = self.chroot_manager.execute_in_chroot(chroot_name, cmd, as_root=True)
return {
'success': result['returncode'] == 0,
'command': command,
'output': result['stdout'],
'error': result['stderr'] if result['returncode'] != 0 else None
}