""" Cache management for deb-mock """ import hashlib import os import shutil import tarfile from datetime import datetime, timedelta from typing import Any, Dict from .exceptions import DebMockError class CacheManager: """Manages various caches for deb-mock (root cache, package cache, ccache)""" def __init__(self, config): self.config = config def get_root_cache_path(self) -> str: """Get the root cache path for the current chroot""" return self.config.get_root_cache_path() def get_package_cache_path(self) -> str: """Get the package cache path for the current chroot""" return self.config.get_package_cache_path() def get_ccache_path(self) -> str: """Get the ccache path for the current chroot""" return self.config.get_ccache_path() def create_root_cache(self, chroot_path: str) -> bool: """Create a root cache from the current chroot""" if not self.config.use_root_cache: return False cache_path = self.get_root_cache_path() cache_file = f"{cache_path}.tar.gz" try: # Create cache directory os.makedirs(os.path.dirname(cache_file), exist_ok=True) # Create tar.gz archive of the chroot with tarfile.open(cache_file, "w:gz") as tar: tar.add(chroot_path, arcname=os.path.basename(chroot_path)) # Update cache metadata self._update_cache_metadata("root_cache", cache_file) return True except Exception as e: raise DebMockError(f"Failed to create root cache: {e}") def restore_root_cache(self, chroot_path: str) -> bool: """Restore chroot from root cache""" if not self.config.use_root_cache: return False cache_file = f"{self.get_root_cache_path()}.tar.gz" if not os.path.exists(cache_file): return False # Check cache age if not self._is_cache_valid("root_cache", cache_file): return False try: # Extract cache to chroot path with tarfile.open(cache_file, "r:gz") as tar: tar.extractall(path=os.path.dirname(chroot_path)) return True except Exception as e: raise DebMockError(f"Failed to restore root cache: {e}") def create_package_cache(self, package_files: list) -> bool: """Create a package cache from downloaded packages""" if not self.config.use_package_cache: return False cache_path = self.get_package_cache_path() try: # Create cache directory os.makedirs(cache_path, exist_ok=True) # Copy package files to cache for package_file in package_files: if os.path.exists(package_file): shutil.copy2(package_file, cache_path) return True except Exception as e: raise DebMockError(f"Failed to create package cache: {e}") def get_cached_packages(self) -> list: """Get list of cached packages""" if not self.config.use_package_cache: return [] cache_path = self.get_package_cache_path() if not os.path.exists(cache_path): return [] packages = [] for file in os.listdir(cache_path): if file.endswith(".deb"): packages.append(os.path.join(cache_path, file)) return packages def setup_ccache(self) -> bool: """Setup ccache for the build environment""" if not self.config.use_ccache: return False ccache_path = self.get_ccache_path() try: # Create ccache directory os.makedirs(ccache_path, exist_ok=True) # Set ccache environment variables os.environ["CCACHE_DIR"] = ccache_path os.environ["CCACHE_HASHDIR"] = "1" return True except Exception as e: raise DebMockError(f"Failed to setup ccache: {e}") def cleanup_old_caches(self) -> Dict[str, int]: """Clean up old cache files""" cleaned = {} # Clean root caches if self.config.use_root_cache: cleaned["root_cache"] = self._cleanup_root_caches() # Clean package caches if self.config.use_package_cache: cleaned["package_cache"] = self._cleanup_package_caches() # Clean ccache if self.config.use_ccache: cleaned["ccache"] = self._cleanup_ccache() return cleaned def _cleanup_root_caches(self) -> int: """Clean up old root cache files""" cache_dir = os.path.dirname(self.get_root_cache_path()) if not os.path.exists(cache_dir): return 0 cleaned = 0 cutoff_time = datetime.now() - timedelta(days=self.config.root_cache_age) for cache_file in os.listdir(cache_dir): if cache_file.endswith(".tar.gz"): cache_path = os.path.join(cache_dir, cache_file) if os.path.getmtime(cache_path) < cutoff_time.timestamp(): os.remove(cache_path) cleaned += 1 return cleaned def _cleanup_package_caches(self) -> int: """Clean up old package cache files""" cache_path = self.get_package_cache_path() if not os.path.exists(cache_path): return 0 cleaned = 0 cutoff_time = datetime.now() - timedelta(days=30) # 30 days for package cache for package_file in os.listdir(cache_path): if package_file.endswith(".deb"): package_path = os.path.join(cache_path, package_file) if os.path.getmtime(package_path) < cutoff_time.timestamp(): os.remove(package_path) cleaned += 1 return cleaned def _cleanup_ccache(self) -> int: """Clean up old ccache files""" ccache_path = self.get_ccache_path() if not os.path.exists(ccache_path): return 0 # Use ccache's built-in cleanup try: import subprocess result = subprocess.run(["ccache", "-c"], cwd=ccache_path, capture_output=True) return 1 if result.returncode == 0 else 0 except Exception: return 0 def _update_cache_metadata(self, cache_type: str, cache_file: str) -> None: """Update cache metadata""" metadata_file = f"{cache_file}.meta" metadata = { "type": cache_type, "created": datetime.now().isoformat(), "size": os.path.getsize(cache_file), "hash": self._get_file_hash(cache_file), } import json with open(metadata_file, "w") as f: json.dump(metadata, f) def _is_cache_valid(self, cache_type: str, cache_file: str) -> bool: """Check if cache is still valid""" metadata_file = f"{cache_file}.meta" if not os.path.exists(metadata_file): return False try: import json with open(metadata_file, "r") as f: metadata = json.load(f) # Check if file size matches if os.path.getsize(cache_file) != metadata.get("size", 0): return False # Check if hash matches if self._get_file_hash(cache_file) != metadata.get("hash", ""): return False # Check age for root cache if cache_type == "root_cache": created = datetime.fromisoformat(metadata["created"]) cutoff_time = datetime.now() - timedelta(days=self.config.root_cache_age) if created < cutoff_time: return False return True except Exception: return False def _get_file_hash(self, file_path: str) -> str: """Get SHA256 hash of a file""" hash_sha256 = hashlib.sha256() with open(file_path, "rb") as f: for chunk in iter(lambda: f.read(4096), b""): hash_sha256.update(chunk) return hash_sha256.hexdigest() def get_cache_stats(self) -> Dict[str, Any]: """Get cache statistics""" stats = {} # Root cache stats if self.config.use_root_cache: cache_file = f"{self.get_root_cache_path()}.tar.gz" if os.path.exists(cache_file): stats["root_cache"] = { "size": os.path.getsize(cache_file), "valid": self._is_cache_valid("root_cache", cache_file), } # Package cache stats if self.config.use_package_cache: cache_path = self.get_package_cache_path() if os.path.exists(cache_path): packages = [f for f in os.listdir(cache_path) if f.endswith(".deb")] stats["package_cache"] = { "packages": len(packages), "size": sum(os.path.getsize(os.path.join(cache_path, p)) for p in packages), } # ccache stats if self.config.use_ccache: ccache_path = self.get_ccache_path() if os.path.exists(ccache_path): try: import subprocess result = subprocess.run( ["ccache", "-s"], cwd=ccache_path, capture_output=True, text=True, ) stats["ccache"] = {"output": result.stdout} except Exception: pass return stats