482 lines
No EOL
18 KiB
Python
482 lines
No EOL
18 KiB
Python
"""
|
|
Core DebMock class for orchestrating the build process
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
import shutil
|
|
from pathlib import Path
|
|
from typing import Dict, Any, Optional, List
|
|
from .config import Config
|
|
from .chroot import ChrootManager
|
|
from .sbuild import SbuildWrapper
|
|
from .metadata import MetadataManager
|
|
from .cache import CacheManager
|
|
from .exceptions import DebMockError, BuildError, ChrootError, SbuildError
|
|
|
|
|
|
class DebMock:
|
|
"""Main DebMock class for orchestrating package builds"""
|
|
|
|
def __init__(self, config: Config):
|
|
self.config = config
|
|
self.chroot_manager = ChrootManager(config)
|
|
self.sbuild_wrapper = SbuildWrapper(config)
|
|
self.metadata_manager = MetadataManager(config)
|
|
self.cache_manager = CacheManager(config)
|
|
|
|
# Validate configuration
|
|
self.config.validate()
|
|
|
|
# Setup caches
|
|
self._setup_caches()
|
|
|
|
def _setup_caches(self) -> None:
|
|
"""Setup cache directories and ccache"""
|
|
try:
|
|
# Setup ccache if enabled
|
|
if self.config.use_ccache:
|
|
self.cache_manager.setup_ccache()
|
|
except Exception as e:
|
|
# Log warning but continue
|
|
print(f"Warning: Failed to setup caches: {e}")
|
|
|
|
def build(self, source_package: str, **kwargs) -> Dict[str, Any]:
|
|
"""Build a Debian source package in an isolated environment"""
|
|
|
|
# Ensure chroot exists
|
|
chroot_name = kwargs.get('chroot_name', self.config.chroot_name)
|
|
chroot_path = self.config.get_chroot_path()
|
|
|
|
# Try to restore from cache first
|
|
if not self.chroot_manager.chroot_exists(chroot_name):
|
|
if not self.cache_manager.restore_root_cache(chroot_path):
|
|
self.chroot_manager.create_chroot(chroot_name)
|
|
|
|
# Check build dependencies
|
|
deps_check = self.sbuild_wrapper.check_dependencies(source_package, chroot_name)
|
|
if not deps_check['satisfied']:
|
|
# Try to install missing dependencies
|
|
if deps_check['missing']:
|
|
self.sbuild_wrapper.install_build_dependencies(deps_check['missing'], chroot_name)
|
|
|
|
# Setup build environment
|
|
build_env = self.config.setup_build_environment()
|
|
|
|
# Build the package
|
|
build_result = self.sbuild_wrapper.build_package(
|
|
source_package,
|
|
chroot_name,
|
|
build_env=build_env,
|
|
**kwargs
|
|
)
|
|
|
|
# Create cache after successful build
|
|
if build_result.get('success', False):
|
|
self.cache_manager.create_root_cache(chroot_path)
|
|
|
|
# Capture and store metadata
|
|
metadata = self._capture_build_metadata(build_result, source_package)
|
|
self.metadata_manager.store_metadata(metadata)
|
|
|
|
# Clean up chroot if not keeping it
|
|
if not kwargs.get('keep_chroot', self.config.keep_chroot):
|
|
self.chroot_manager.clean_chroot(chroot_name)
|
|
|
|
return build_result
|
|
|
|
def build_chain(self, source_packages: List[str], **kwargs) -> List[Dict[str, Any]]:
|
|
"""Build a chain of packages that depend on each other (similar to Mock's --chain)"""
|
|
|
|
results = []
|
|
chroot_name = kwargs.get('chroot_name', self.config.chroot_name)
|
|
chroot_path = self.config.get_chroot_path()
|
|
|
|
# Try to restore from cache first
|
|
if not self.chroot_manager.chroot_exists(chroot_name):
|
|
if not self.cache_manager.restore_root_cache(chroot_path):
|
|
self.chroot_manager.create_chroot(chroot_name)
|
|
|
|
# Setup build environment
|
|
build_env = self.config.setup_build_environment()
|
|
|
|
for i, source_package in enumerate(source_packages):
|
|
try:
|
|
# Build the package
|
|
result = self.sbuild_wrapper.build_package(
|
|
source_package,
|
|
chroot_name,
|
|
build_env=build_env,
|
|
**kwargs
|
|
)
|
|
|
|
results.append({
|
|
'package': source_package,
|
|
'success': True,
|
|
'result': result,
|
|
'order': i + 1
|
|
})
|
|
|
|
# Install the built package in the chroot for subsequent builds
|
|
if result.get('artifacts'):
|
|
self._install_built_package(result['artifacts'], chroot_name)
|
|
|
|
except Exception as e:
|
|
results.append({
|
|
'package': source_package,
|
|
'success': False,
|
|
'error': str(e),
|
|
'order': i + 1
|
|
})
|
|
# Stop chain on failure unless continue_on_failure is specified
|
|
if not kwargs.get('continue_on_failure', False):
|
|
break
|
|
|
|
# Create cache after successful chain build
|
|
if any(r['success'] for r in results):
|
|
self.cache_manager.create_root_cache(chroot_path)
|
|
|
|
return results
|
|
|
|
def _install_built_package(self, artifacts: List[str], chroot_name: str) -> None:
|
|
"""Install a built package in the chroot for chain building"""
|
|
|
|
# Find .deb files in artifacts
|
|
deb_files = [art for art in artifacts if art.endswith('.deb')]
|
|
|
|
if not deb_files:
|
|
return
|
|
|
|
# Copy .deb files to chroot and install them
|
|
for deb_file in deb_files:
|
|
try:
|
|
# Copy to chroot
|
|
chroot_deb_path = f"/tmp/{os.path.basename(deb_file)}"
|
|
self.chroot_manager.copy_to_chroot(deb_file, chroot_deb_path, chroot_name)
|
|
|
|
# Install in chroot
|
|
self.chroot_manager.execute_in_chroot(
|
|
chroot_name,
|
|
['dpkg', '-i', chroot_deb_path],
|
|
capture_output=False
|
|
)
|
|
|
|
# Clean up
|
|
self.chroot_manager.execute_in_chroot(
|
|
chroot_name,
|
|
['rm', '-f', chroot_deb_path],
|
|
capture_output=False
|
|
)
|
|
|
|
except Exception as e:
|
|
# Log warning but continue
|
|
print(f"Warning: Failed to install {deb_file} in chroot: {e}")
|
|
|
|
def init_chroot(self, chroot_name: str, arch: str = None, suite: str = None) -> None:
|
|
"""Initialize a new chroot environment"""
|
|
self.chroot_manager.create_chroot(chroot_name, arch, suite)
|
|
|
|
# Create cache after successful chroot creation
|
|
chroot_path = os.path.join(self.config.chroot_dir, chroot_name)
|
|
self.cache_manager.create_root_cache(chroot_path)
|
|
|
|
def clean_chroot(self, chroot_name: str) -> None:
|
|
"""Clean up a chroot environment"""
|
|
self.chroot_manager.clean_chroot(chroot_name)
|
|
|
|
def list_chroots(self) -> list:
|
|
"""List available chroot environments"""
|
|
return self.chroot_manager.list_chroots()
|
|
|
|
def update_chroot(self, chroot_name: str) -> None:
|
|
"""Update packages in a chroot environment"""
|
|
self.chroot_manager.update_chroot(chroot_name)
|
|
|
|
# Update cache after successful update
|
|
chroot_path = os.path.join(self.config.chroot_dir, chroot_name)
|
|
self.cache_manager.create_root_cache(chroot_path)
|
|
|
|
def get_chroot_info(self, chroot_name: str) -> dict:
|
|
"""Get information about a chroot environment"""
|
|
return self.chroot_manager.get_chroot_info(chroot_name)
|
|
|
|
def shell(self, chroot_name: str = None) -> None:
|
|
"""Open a shell in the chroot environment (similar to Mock's --shell)"""
|
|
if chroot_name is None:
|
|
chroot_name = self.config.chroot_name
|
|
|
|
if not self.chroot_manager.chroot_exists(chroot_name):
|
|
raise ChrootError(f"Chroot '{chroot_name}' does not exist")
|
|
|
|
# Execute shell in chroot
|
|
self.chroot_manager.execute_in_chroot(
|
|
chroot_name,
|
|
['/bin/bash'],
|
|
capture_output=False
|
|
)
|
|
|
|
def copyout(self, source_path: str, dest_path: str, chroot_name: str = None) -> None:
|
|
"""Copy files from chroot to host (similar to Mock's --copyout)"""
|
|
if chroot_name is None:
|
|
chroot_name = self.config.chroot_name
|
|
|
|
self.chroot_manager.copy_from_chroot(source_path, dest_path, chroot_name)
|
|
|
|
def copyin(self, source_path: str, dest_path: str, chroot_name: str = None) -> None:
|
|
"""Copy files from host to chroot (similar to Mock's --copyin)"""
|
|
if chroot_name is None:
|
|
chroot_name = self.config.chroot_name
|
|
|
|
self.chroot_manager.copy_to_chroot(source_path, dest_path, chroot_name)
|
|
|
|
def cleanup_caches(self) -> Dict[str, int]:
|
|
"""Clean up old cache files (similar to Mock's cache management)"""
|
|
return self.cache_manager.cleanup_old_caches()
|
|
|
|
def get_cache_stats(self) -> Dict[str, Any]:
|
|
"""Get cache statistics"""
|
|
return self.cache_manager.get_cache_stats()
|
|
|
|
def _capture_build_metadata(self, build_result: Dict[str, Any], source_package: str) -> Dict[str, Any]:
|
|
"""Capture comprehensive build metadata"""
|
|
|
|
metadata = {
|
|
'source_package': source_package,
|
|
'build_result': build_result,
|
|
'config': self.config.to_dict(),
|
|
'artifacts': build_result.get('artifacts', []),
|
|
'build_metadata': build_result.get('metadata', {}),
|
|
'timestamp': self._get_timestamp(),
|
|
'build_success': build_result.get('success', False),
|
|
'cache_info': self.get_cache_stats()
|
|
}
|
|
|
|
# Add artifact details
|
|
metadata['artifact_details'] = self._get_artifact_details(build_result.get('artifacts', []))
|
|
|
|
return metadata
|
|
|
|
def _get_timestamp(self) -> str:
|
|
"""Get current timestamp"""
|
|
from datetime import datetime
|
|
return datetime.now().isoformat()
|
|
|
|
def _get_artifact_details(self, artifacts: list) -> list:
|
|
"""Get detailed information about build artifacts"""
|
|
details = []
|
|
|
|
for artifact_path in artifacts:
|
|
if os.path.exists(artifact_path):
|
|
stat = os.stat(artifact_path)
|
|
details.append({
|
|
'path': artifact_path,
|
|
'name': os.path.basename(artifact_path),
|
|
'size': stat.st_size,
|
|
'modified': stat.st_mtime,
|
|
'type': self._get_artifact_type(artifact_path)
|
|
})
|
|
|
|
return details
|
|
|
|
def _get_artifact_type(self, artifact_path: str) -> str:
|
|
"""Determine the type of build artifact"""
|
|
ext = Path(artifact_path).suffix.lower()
|
|
|
|
if ext == '.deb':
|
|
return 'deb_package'
|
|
elif ext == '.changes':
|
|
return 'changes_file'
|
|
elif ext == '.buildinfo':
|
|
return 'buildinfo_file'
|
|
elif ext == '.dsc':
|
|
return 'source_package'
|
|
else:
|
|
return 'other'
|
|
|
|
def verify_reproducible_build(self, source_package: str, **kwargs) -> Dict[str, Any]:
|
|
"""Verify that a build is reproducible by building twice and comparing results"""
|
|
|
|
# First build
|
|
result1 = self.build(source_package, **kwargs)
|
|
|
|
# Clean chroot for second build
|
|
chroot_name = kwargs.get('chroot_name', self.config.chroot_name)
|
|
if self.chroot_manager.chroot_exists(chroot_name):
|
|
self.chroot_manager.clean_chroot(chroot_name)
|
|
|
|
# Second build
|
|
result2 = self.build(source_package, **kwargs)
|
|
|
|
# Compare results
|
|
comparison = self._compare_build_results(result1, result2)
|
|
|
|
return {
|
|
'reproducible': comparison['identical'],
|
|
'first_build': result1,
|
|
'second_build': result2,
|
|
'comparison': comparison
|
|
}
|
|
|
|
def _compare_build_results(self, result1: Dict[str, Any], result2: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Compare two build results for reproducibility"""
|
|
|
|
comparison = {
|
|
'identical': True,
|
|
'differences': [],
|
|
'artifact_comparison': {}
|
|
}
|
|
|
|
# Compare artifacts
|
|
artifacts1 = set(result1.get('artifacts', []))
|
|
artifacts2 = set(result2.get('artifacts', []))
|
|
|
|
if artifacts1 != artifacts2:
|
|
comparison['identical'] = False
|
|
comparison['differences'].append('Different artifacts produced')
|
|
|
|
# Compare individual artifacts
|
|
common_artifacts = artifacts1.intersection(artifacts2)
|
|
for artifact in common_artifacts:
|
|
if os.path.exists(artifact):
|
|
# Compare file hashes
|
|
hash1 = self._get_file_hash(artifact)
|
|
hash2 = self._get_file_hash(artifact)
|
|
|
|
comparison['artifact_comparison'][artifact] = {
|
|
'identical': hash1 == hash2,
|
|
'hash1': hash1,
|
|
'hash2': hash2
|
|
}
|
|
|
|
if hash1 != hash2:
|
|
comparison['identical'] = False
|
|
comparison['differences'].append(f'Artifact {artifact} differs')
|
|
|
|
return comparison
|
|
|
|
def _get_file_hash(self, file_path: str) -> str:
|
|
"""Get SHA256 hash of a file"""
|
|
import hashlib
|
|
|
|
hash_sha256 = hashlib.sha256()
|
|
with open(file_path, "rb") as f:
|
|
for chunk in iter(lambda: f.read(4096), b""):
|
|
hash_sha256.update(chunk)
|
|
|
|
return hash_sha256.hexdigest()
|
|
|
|
def get_build_history(self) -> list:
|
|
"""Get build history from metadata store"""
|
|
return self.metadata_manager.get_build_history()
|
|
|
|
def get_build_info(self, build_id: str) -> Optional[Dict[str, Any]]:
|
|
"""Get information about a specific build"""
|
|
return self.metadata_manager.get_build_info(build_id)
|
|
|
|
def install_dependencies(self, source_package: str) -> Dict[str, Any]:
|
|
"""Install build dependencies for a source package"""
|
|
chroot_name = self.config.chroot_name
|
|
|
|
# Ensure chroot exists
|
|
if not self.chroot_manager.chroot_exists(chroot_name):
|
|
self.chroot_manager.create_chroot(chroot_name)
|
|
|
|
# Check and install dependencies
|
|
deps_check = self.sbuild_wrapper.check_dependencies(source_package, chroot_name)
|
|
if deps_check['missing']:
|
|
result = self.sbuild_wrapper.install_build_dependencies(deps_check['missing'], chroot_name)
|
|
return {
|
|
'success': True,
|
|
'installed': deps_check['missing'],
|
|
'details': result
|
|
}
|
|
else:
|
|
return {
|
|
'success': True,
|
|
'installed': [],
|
|
'message': 'All dependencies already satisfied'
|
|
}
|
|
|
|
def install_packages(self, packages: List[str]) -> Dict[str, Any]:
|
|
"""Install packages in the chroot environment"""
|
|
chroot_name = self.config.chroot_name
|
|
|
|
# Ensure chroot exists
|
|
if not self.chroot_manager.chroot_exists(chroot_name):
|
|
self.chroot_manager.create_chroot(chroot_name)
|
|
|
|
# Install packages using APT
|
|
result = self.chroot_manager.execute_in_chroot(
|
|
chroot_name,
|
|
f"{self.config.apt_install_command} {' '.join(packages)}",
|
|
as_root=True
|
|
)
|
|
|
|
return {
|
|
'success': result['returncode'] == 0,
|
|
'installed': packages,
|
|
'output': result['stdout'],
|
|
'error': result['stderr'] if result['returncode'] != 0 else None
|
|
}
|
|
|
|
def update_packages(self, packages: List[str] = None) -> Dict[str, Any]:
|
|
"""Update packages in the chroot environment"""
|
|
chroot_name = self.config.chroot_name
|
|
|
|
# Ensure chroot exists
|
|
if not self.chroot_manager.chroot_exists(chroot_name):
|
|
self.chroot_manager.create_chroot(chroot_name)
|
|
|
|
if packages:
|
|
# Update specific packages
|
|
cmd = f"{self.config.apt_command} install --only-upgrade {' '.join(packages)}"
|
|
else:
|
|
# Update all packages
|
|
cmd = f"{self.config.apt_command} update && {self.config.apt_command} upgrade -y"
|
|
|
|
result = self.chroot_manager.execute_in_chroot(chroot_name, cmd, as_root=True)
|
|
|
|
return {
|
|
'success': result['returncode'] == 0,
|
|
'updated': packages if packages else 'all',
|
|
'output': result['stdout'],
|
|
'error': result['stderr'] if result['returncode'] != 0 else None
|
|
}
|
|
|
|
def remove_packages(self, packages: List[str]) -> Dict[str, Any]:
|
|
"""Remove packages from the chroot environment"""
|
|
chroot_name = self.config.chroot_name
|
|
|
|
# Ensure chroot exists
|
|
if not self.chroot_manager.chroot_exists(chroot_name):
|
|
self.chroot_manager.create_chroot(chroot_name)
|
|
|
|
# Remove packages using APT
|
|
cmd = f"{self.config.apt_command} remove -y {' '.join(packages)}"
|
|
result = self.chroot_manager.execute_in_chroot(chroot_name, cmd, as_root=True)
|
|
|
|
return {
|
|
'success': result['returncode'] == 0,
|
|
'removed': packages,
|
|
'output': result['stdout'],
|
|
'error': result['stderr'] if result['returncode'] != 0 else None
|
|
}
|
|
|
|
def execute_apt_command(self, command: str) -> Dict[str, Any]:
|
|
"""Execute APT command in the chroot environment"""
|
|
chroot_name = self.config.chroot_name
|
|
|
|
# Ensure chroot exists
|
|
if not self.chroot_manager.chroot_exists(chroot_name):
|
|
self.chroot_manager.create_chroot(chroot_name)
|
|
|
|
# Execute APT command
|
|
cmd = f"{self.config.apt_command} {command}"
|
|
result = self.chroot_manager.execute_in_chroot(chroot_name, cmd, as_root=True)
|
|
|
|
return {
|
|
'success': result['returncode'] == 0,
|
|
'command': command,
|
|
'output': result['stdout'],
|
|
'error': result['stderr'] if result['returncode'] != 0 else None
|
|
} |