deb-mock/deb_mock/cache.py
2025-08-03 22:16:04 +00:00

299 lines
No EOL
10 KiB
Python

"""
Cache management for deb-mock
"""
import os
import shutil
import tarfile
import hashlib
from pathlib import Path
from typing import Optional, Dict, Any
from datetime import datetime, timedelta
from .exceptions import DebMockError
class CacheManager:
"""Manages various caches for deb-mock (root cache, package cache, ccache)"""
def __init__(self, config):
self.config = config
def get_root_cache_path(self) -> str:
"""Get the root cache path for the current chroot"""
return self.config.get_root_cache_path()
def get_package_cache_path(self) -> str:
"""Get the package cache path for the current chroot"""
return self.config.get_package_cache_path()
def get_ccache_path(self) -> str:
"""Get the ccache path for the current chroot"""
return self.config.get_ccache_path()
def create_root_cache(self, chroot_path: str) -> bool:
"""Create a root cache from the current chroot"""
if not self.config.use_root_cache:
return False
cache_path = self.get_root_cache_path()
cache_file = f"{cache_path}.tar.gz"
try:
# Create cache directory
os.makedirs(os.path.dirname(cache_file), exist_ok=True)
# Create tar.gz archive of the chroot
with tarfile.open(cache_file, 'w:gz') as tar:
tar.add(chroot_path, arcname=os.path.basename(chroot_path))
# Update cache metadata
self._update_cache_metadata('root_cache', cache_file)
return True
except Exception as e:
raise DebMockError(f"Failed to create root cache: {e}")
def restore_root_cache(self, chroot_path: str) -> bool:
"""Restore chroot from root cache"""
if not self.config.use_root_cache:
return False
cache_file = f"{self.get_root_cache_path()}.tar.gz"
if not os.path.exists(cache_file):
return False
# Check cache age
if not self._is_cache_valid('root_cache', cache_file):
return False
try:
# Extract cache to chroot path
with tarfile.open(cache_file, 'r:gz') as tar:
tar.extractall(path=os.path.dirname(chroot_path))
return True
except Exception as e:
raise DebMockError(f"Failed to restore root cache: {e}")
def create_package_cache(self, package_files: list) -> bool:
"""Create a package cache from downloaded packages"""
if not self.config.use_package_cache:
return False
cache_path = self.get_package_cache_path()
try:
# Create cache directory
os.makedirs(cache_path, exist_ok=True)
# Copy package files to cache
for package_file in package_files:
if os.path.exists(package_file):
shutil.copy2(package_file, cache_path)
return True
except Exception as e:
raise DebMockError(f"Failed to create package cache: {e}")
def get_cached_packages(self) -> list:
"""Get list of cached packages"""
if not self.config.use_package_cache:
return []
cache_path = self.get_package_cache_path()
if not os.path.exists(cache_path):
return []
packages = []
for file in os.listdir(cache_path):
if file.endswith('.deb'):
packages.append(os.path.join(cache_path, file))
return packages
def setup_ccache(self) -> bool:
"""Setup ccache for the build environment"""
if not self.config.use_ccache:
return False
ccache_path = self.get_ccache_path()
try:
# Create ccache directory
os.makedirs(ccache_path, exist_ok=True)
# Set ccache environment variables
os.environ['CCACHE_DIR'] = ccache_path
os.environ['CCACHE_HASHDIR'] = '1'
return True
except Exception as e:
raise DebMockError(f"Failed to setup ccache: {e}")
def cleanup_old_caches(self) -> Dict[str, int]:
"""Clean up old cache files"""
cleaned = {}
# Clean root caches
if self.config.use_root_cache:
cleaned['root_cache'] = self._cleanup_root_caches()
# Clean package caches
if self.config.use_package_cache:
cleaned['package_cache'] = self._cleanup_package_caches()
# Clean ccache
if self.config.use_ccache:
cleaned['ccache'] = self._cleanup_ccache()
return cleaned
def _cleanup_root_caches(self) -> int:
"""Clean up old root cache files"""
cache_dir = os.path.dirname(self.get_root_cache_path())
if not os.path.exists(cache_dir):
return 0
cleaned = 0
cutoff_time = datetime.now() - timedelta(days=self.config.root_cache_age)
for cache_file in os.listdir(cache_dir):
if cache_file.endswith('.tar.gz'):
cache_path = os.path.join(cache_dir, cache_file)
if os.path.getmtime(cache_path) < cutoff_time.timestamp():
os.remove(cache_path)
cleaned += 1
return cleaned
def _cleanup_package_caches(self) -> int:
"""Clean up old package cache files"""
cache_path = self.get_package_cache_path()
if not os.path.exists(cache_path):
return 0
cleaned = 0
cutoff_time = datetime.now() - timedelta(days=30) # 30 days for package cache
for package_file in os.listdir(cache_path):
if package_file.endswith('.deb'):
package_path = os.path.join(cache_path, package_file)
if os.path.getmtime(package_path) < cutoff_time.timestamp():
os.remove(package_path)
cleaned += 1
return cleaned
def _cleanup_ccache(self) -> int:
"""Clean up old ccache files"""
ccache_path = self.get_ccache_path()
if not os.path.exists(ccache_path):
return 0
# Use ccache's built-in cleanup
try:
import subprocess
result = subprocess.run(['ccache', '-c'], cwd=ccache_path, capture_output=True)
return 1 if result.returncode == 0 else 0
except Exception:
return 0
def _update_cache_metadata(self, cache_type: str, cache_file: str) -> None:
"""Update cache metadata"""
metadata_file = f"{cache_file}.meta"
metadata = {
'type': cache_type,
'created': datetime.now().isoformat(),
'size': os.path.getsize(cache_file),
'hash': self._get_file_hash(cache_file)
}
import json
with open(metadata_file, 'w') as f:
json.dump(metadata, f)
def _is_cache_valid(self, cache_type: str, cache_file: str) -> bool:
"""Check if cache is still valid"""
metadata_file = f"{cache_file}.meta"
if not os.path.exists(metadata_file):
return False
try:
import json
with open(metadata_file, 'r') as f:
metadata = json.load(f)
# Check if file size matches
if os.path.getsize(cache_file) != metadata.get('size', 0):
return False
# Check if hash matches
if self._get_file_hash(cache_file) != metadata.get('hash', ''):
return False
# Check age for root cache
if cache_type == 'root_cache':
created = datetime.fromisoformat(metadata['created'])
cutoff_time = datetime.now() - timedelta(days=self.config.root_cache_age)
if created < cutoff_time:
return False
return True
except Exception:
return False
def _get_file_hash(self, file_path: str) -> str:
"""Get SHA256 hash of a file"""
hash_sha256 = hashlib.sha256()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_sha256.update(chunk)
return hash_sha256.hexdigest()
def get_cache_stats(self) -> Dict[str, Any]:
"""Get cache statistics"""
stats = {}
# Root cache stats
if self.config.use_root_cache:
cache_file = f"{self.get_root_cache_path()}.tar.gz"
if os.path.exists(cache_file):
stats['root_cache'] = {
'size': os.path.getsize(cache_file),
'valid': self._is_cache_valid('root_cache', cache_file)
}
# Package cache stats
if self.config.use_package_cache:
cache_path = self.get_package_cache_path()
if os.path.exists(cache_path):
packages = [f for f in os.listdir(cache_path) if f.endswith('.deb')]
stats['package_cache'] = {
'packages': len(packages),
'size': sum(os.path.getsize(os.path.join(cache_path, p)) for p in packages)
}
# ccache stats
if self.config.use_ccache:
ccache_path = self.get_ccache_path()
if os.path.exists(ccache_path):
try:
import subprocess
result = subprocess.run(['ccache', '-s'], cwd=ccache_path,
capture_output=True, text=True)
stats['ccache'] = {
'output': result.stdout
}
except Exception:
pass
return stats