""" Cache management for deb-mock """ import os import shutil import tarfile import hashlib from pathlib import Path from typing import Optional, Dict, Any from datetime import datetime, timedelta from .exceptions import DebMockError class CacheManager: """Manages various caches for deb-mock (root cache, package cache, ccache)""" def __init__(self, config): self.config = config def get_root_cache_path(self) -> str: """Get the root cache path for the current chroot""" return self.config.get_root_cache_path() def get_package_cache_path(self) -> str: """Get the package cache path for the current chroot""" return self.config.get_package_cache_path() def get_ccache_path(self) -> str: """Get the ccache path for the current chroot""" return self.config.get_ccache_path() def create_root_cache(self, chroot_path: str) -> bool: """Create a root cache from the current chroot""" if not self.config.use_root_cache: return False cache_path = self.get_root_cache_path() cache_file = f"{cache_path}.tar.gz" try: # Create cache directory os.makedirs(os.path.dirname(cache_file), exist_ok=True) # Create tar.gz archive of the chroot with tarfile.open(cache_file, 'w:gz') as tar: tar.add(chroot_path, arcname=os.path.basename(chroot_path)) # Update cache metadata self._update_cache_metadata('root_cache', cache_file) return True except Exception as e: raise DebMockError(f"Failed to create root cache: {e}") def restore_root_cache(self, chroot_path: str) -> bool: """Restore chroot from root cache""" if not self.config.use_root_cache: return False cache_file = f"{self.get_root_cache_path()}.tar.gz" if not os.path.exists(cache_file): return False # Check cache age if not self._is_cache_valid('root_cache', cache_file): return False try: # Extract cache to chroot path with tarfile.open(cache_file, 'r:gz') as tar: tar.extractall(path=os.path.dirname(chroot_path)) return True except Exception as e: raise DebMockError(f"Failed to restore root cache: {e}") def create_package_cache(self, package_files: list) -> bool: """Create a package cache from downloaded packages""" if not self.config.use_package_cache: return False cache_path = self.get_package_cache_path() try: # Create cache directory os.makedirs(cache_path, exist_ok=True) # Copy package files to cache for package_file in package_files: if os.path.exists(package_file): shutil.copy2(package_file, cache_path) return True except Exception as e: raise DebMockError(f"Failed to create package cache: {e}") def get_cached_packages(self) -> list: """Get list of cached packages""" if not self.config.use_package_cache: return [] cache_path = self.get_package_cache_path() if not os.path.exists(cache_path): return [] packages = [] for file in os.listdir(cache_path): if file.endswith('.deb'): packages.append(os.path.join(cache_path, file)) return packages def setup_ccache(self) -> bool: """Setup ccache for the build environment""" if not self.config.use_ccache: return False ccache_path = self.get_ccache_path() try: # Create ccache directory os.makedirs(ccache_path, exist_ok=True) # Set ccache environment variables os.environ['CCACHE_DIR'] = ccache_path os.environ['CCACHE_HASHDIR'] = '1' return True except Exception as e: raise DebMockError(f"Failed to setup ccache: {e}") def cleanup_old_caches(self) -> Dict[str, int]: """Clean up old cache files""" cleaned = {} # Clean root caches if self.config.use_root_cache: cleaned['root_cache'] = self._cleanup_root_caches() # Clean package caches if self.config.use_package_cache: cleaned['package_cache'] = self._cleanup_package_caches() # Clean ccache if self.config.use_ccache: cleaned['ccache'] = self._cleanup_ccache() return cleaned def _cleanup_root_caches(self) -> int: """Clean up old root cache files""" cache_dir = os.path.dirname(self.get_root_cache_path()) if not os.path.exists(cache_dir): return 0 cleaned = 0 cutoff_time = datetime.now() - timedelta(days=self.config.root_cache_age) for cache_file in os.listdir(cache_dir): if cache_file.endswith('.tar.gz'): cache_path = os.path.join(cache_dir, cache_file) if os.path.getmtime(cache_path) < cutoff_time.timestamp(): os.remove(cache_path) cleaned += 1 return cleaned def _cleanup_package_caches(self) -> int: """Clean up old package cache files""" cache_path = self.get_package_cache_path() if not os.path.exists(cache_path): return 0 cleaned = 0 cutoff_time = datetime.now() - timedelta(days=30) # 30 days for package cache for package_file in os.listdir(cache_path): if package_file.endswith('.deb'): package_path = os.path.join(cache_path, package_file) if os.path.getmtime(package_path) < cutoff_time.timestamp(): os.remove(package_path) cleaned += 1 return cleaned def _cleanup_ccache(self) -> int: """Clean up old ccache files""" ccache_path = self.get_ccache_path() if not os.path.exists(ccache_path): return 0 # Use ccache's built-in cleanup try: import subprocess result = subprocess.run(['ccache', '-c'], cwd=ccache_path, capture_output=True) return 1 if result.returncode == 0 else 0 except Exception: return 0 def _update_cache_metadata(self, cache_type: str, cache_file: str) -> None: """Update cache metadata""" metadata_file = f"{cache_file}.meta" metadata = { 'type': cache_type, 'created': datetime.now().isoformat(), 'size': os.path.getsize(cache_file), 'hash': self._get_file_hash(cache_file) } import json with open(metadata_file, 'w') as f: json.dump(metadata, f) def _is_cache_valid(self, cache_type: str, cache_file: str) -> bool: """Check if cache is still valid""" metadata_file = f"{cache_file}.meta" if not os.path.exists(metadata_file): return False try: import json with open(metadata_file, 'r') as f: metadata = json.load(f) # Check if file size matches if os.path.getsize(cache_file) != metadata.get('size', 0): return False # Check if hash matches if self._get_file_hash(cache_file) != metadata.get('hash', ''): return False # Check age for root cache if cache_type == 'root_cache': created = datetime.fromisoformat(metadata['created']) cutoff_time = datetime.now() - timedelta(days=self.config.root_cache_age) if created < cutoff_time: return False return True except Exception: return False def _get_file_hash(self, file_path: str) -> str: """Get SHA256 hash of a file""" hash_sha256 = hashlib.sha256() with open(file_path, "rb") as f: for chunk in iter(lambda: f.read(4096), b""): hash_sha256.update(chunk) return hash_sha256.hexdigest() def get_cache_stats(self) -> Dict[str, Any]: """Get cache statistics""" stats = {} # Root cache stats if self.config.use_root_cache: cache_file = f"{self.get_root_cache_path()}.tar.gz" if os.path.exists(cache_file): stats['root_cache'] = { 'size': os.path.getsize(cache_file), 'valid': self._is_cache_valid('root_cache', cache_file) } # Package cache stats if self.config.use_package_cache: cache_path = self.get_package_cache_path() if os.path.exists(cache_path): packages = [f for f in os.listdir(cache_path) if f.endswith('.deb')] stats['package_cache'] = { 'packages': len(packages), 'size': sum(os.path.getsize(os.path.join(cache_path, p)) for p in packages) } # ccache stats if self.config.use_ccache: ccache_path = self.get_ccache_path() if os.path.exists(ccache_path): try: import subprocess result = subprocess.run(['ccache', '-s'], cwd=ccache_path, capture_output=True, text=True) stats['ccache'] = { 'output': result.stdout } except Exception: pass return stats