initial commit
This commit is contained in:
commit
74fe9143d9
43 changed files with 10069 additions and 0 deletions
299
deb_mock/cache.py
Normal file
299
deb_mock/cache.py
Normal file
|
|
@ -0,0 +1,299 @@
|
|||
"""
|
||||
Cache management for deb-mock
|
||||
"""
|
||||
|
||||
import os
|
||||
import shutil
|
||||
import tarfile
|
||||
import hashlib
|
||||
from pathlib import Path
|
||||
from typing import Optional, Dict, Any
|
||||
from datetime import datetime, timedelta
|
||||
from .exceptions import DebMockError
|
||||
|
||||
|
||||
class CacheManager:
|
||||
"""Manages various caches for deb-mock (root cache, package cache, ccache)"""
|
||||
|
||||
def __init__(self, config):
|
||||
self.config = config
|
||||
|
||||
def get_root_cache_path(self) -> str:
|
||||
"""Get the root cache path for the current chroot"""
|
||||
return self.config.get_root_cache_path()
|
||||
|
||||
def get_package_cache_path(self) -> str:
|
||||
"""Get the package cache path for the current chroot"""
|
||||
return self.config.get_package_cache_path()
|
||||
|
||||
def get_ccache_path(self) -> str:
|
||||
"""Get the ccache path for the current chroot"""
|
||||
return self.config.get_ccache_path()
|
||||
|
||||
def create_root_cache(self, chroot_path: str) -> bool:
|
||||
"""Create a root cache from the current chroot"""
|
||||
if not self.config.use_root_cache:
|
||||
return False
|
||||
|
||||
cache_path = self.get_root_cache_path()
|
||||
cache_file = f"{cache_path}.tar.gz"
|
||||
|
||||
try:
|
||||
# Create cache directory
|
||||
os.makedirs(os.path.dirname(cache_file), exist_ok=True)
|
||||
|
||||
# Create tar.gz archive of the chroot
|
||||
with tarfile.open(cache_file, 'w:gz') as tar:
|
||||
tar.add(chroot_path, arcname=os.path.basename(chroot_path))
|
||||
|
||||
# Update cache metadata
|
||||
self._update_cache_metadata('root_cache', cache_file)
|
||||
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
raise DebMockError(f"Failed to create root cache: {e}")
|
||||
|
||||
def restore_root_cache(self, chroot_path: str) -> bool:
|
||||
"""Restore chroot from root cache"""
|
||||
if not self.config.use_root_cache:
|
||||
return False
|
||||
|
||||
cache_file = f"{self.get_root_cache_path()}.tar.gz"
|
||||
|
||||
if not os.path.exists(cache_file):
|
||||
return False
|
||||
|
||||
# Check cache age
|
||||
if not self._is_cache_valid('root_cache', cache_file):
|
||||
return False
|
||||
|
||||
try:
|
||||
# Extract cache to chroot path
|
||||
with tarfile.open(cache_file, 'r:gz') as tar:
|
||||
tar.extractall(path=os.path.dirname(chroot_path))
|
||||
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
raise DebMockError(f"Failed to restore root cache: {e}")
|
||||
|
||||
def create_package_cache(self, package_files: list) -> bool:
|
||||
"""Create a package cache from downloaded packages"""
|
||||
if not self.config.use_package_cache:
|
||||
return False
|
||||
|
||||
cache_path = self.get_package_cache_path()
|
||||
|
||||
try:
|
||||
# Create cache directory
|
||||
os.makedirs(cache_path, exist_ok=True)
|
||||
|
||||
# Copy package files to cache
|
||||
for package_file in package_files:
|
||||
if os.path.exists(package_file):
|
||||
shutil.copy2(package_file, cache_path)
|
||||
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
raise DebMockError(f"Failed to create package cache: {e}")
|
||||
|
||||
def get_cached_packages(self) -> list:
|
||||
"""Get list of cached packages"""
|
||||
if not self.config.use_package_cache:
|
||||
return []
|
||||
|
||||
cache_path = self.get_package_cache_path()
|
||||
|
||||
if not os.path.exists(cache_path):
|
||||
return []
|
||||
|
||||
packages = []
|
||||
for file in os.listdir(cache_path):
|
||||
if file.endswith('.deb'):
|
||||
packages.append(os.path.join(cache_path, file))
|
||||
|
||||
return packages
|
||||
|
||||
def setup_ccache(self) -> bool:
|
||||
"""Setup ccache for the build environment"""
|
||||
if not self.config.use_ccache:
|
||||
return False
|
||||
|
||||
ccache_path = self.get_ccache_path()
|
||||
|
||||
try:
|
||||
# Create ccache directory
|
||||
os.makedirs(ccache_path, exist_ok=True)
|
||||
|
||||
# Set ccache environment variables
|
||||
os.environ['CCACHE_DIR'] = ccache_path
|
||||
os.environ['CCACHE_HASHDIR'] = '1'
|
||||
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
raise DebMockError(f"Failed to setup ccache: {e}")
|
||||
|
||||
def cleanup_old_caches(self) -> Dict[str, int]:
|
||||
"""Clean up old cache files"""
|
||||
cleaned = {}
|
||||
|
||||
# Clean root caches
|
||||
if self.config.use_root_cache:
|
||||
cleaned['root_cache'] = self._cleanup_root_caches()
|
||||
|
||||
# Clean package caches
|
||||
if self.config.use_package_cache:
|
||||
cleaned['package_cache'] = self._cleanup_package_caches()
|
||||
|
||||
# Clean ccache
|
||||
if self.config.use_ccache:
|
||||
cleaned['ccache'] = self._cleanup_ccache()
|
||||
|
||||
return cleaned
|
||||
|
||||
def _cleanup_root_caches(self) -> int:
|
||||
"""Clean up old root cache files"""
|
||||
cache_dir = os.path.dirname(self.get_root_cache_path())
|
||||
if not os.path.exists(cache_dir):
|
||||
return 0
|
||||
|
||||
cleaned = 0
|
||||
cutoff_time = datetime.now() - timedelta(days=self.config.root_cache_age)
|
||||
|
||||
for cache_file in os.listdir(cache_dir):
|
||||
if cache_file.endswith('.tar.gz'):
|
||||
cache_path = os.path.join(cache_dir, cache_file)
|
||||
if os.path.getmtime(cache_path) < cutoff_time.timestamp():
|
||||
os.remove(cache_path)
|
||||
cleaned += 1
|
||||
|
||||
return cleaned
|
||||
|
||||
def _cleanup_package_caches(self) -> int:
|
||||
"""Clean up old package cache files"""
|
||||
cache_path = self.get_package_cache_path()
|
||||
if not os.path.exists(cache_path):
|
||||
return 0
|
||||
|
||||
cleaned = 0
|
||||
cutoff_time = datetime.now() - timedelta(days=30) # 30 days for package cache
|
||||
|
||||
for package_file in os.listdir(cache_path):
|
||||
if package_file.endswith('.deb'):
|
||||
package_path = os.path.join(cache_path, package_file)
|
||||
if os.path.getmtime(package_path) < cutoff_time.timestamp():
|
||||
os.remove(package_path)
|
||||
cleaned += 1
|
||||
|
||||
return cleaned
|
||||
|
||||
def _cleanup_ccache(self) -> int:
|
||||
"""Clean up old ccache files"""
|
||||
ccache_path = self.get_ccache_path()
|
||||
if not os.path.exists(ccache_path):
|
||||
return 0
|
||||
|
||||
# Use ccache's built-in cleanup
|
||||
try:
|
||||
import subprocess
|
||||
result = subprocess.run(['ccache', '-c'], cwd=ccache_path, capture_output=True)
|
||||
return 1 if result.returncode == 0 else 0
|
||||
except Exception:
|
||||
return 0
|
||||
|
||||
def _update_cache_metadata(self, cache_type: str, cache_file: str) -> None:
|
||||
"""Update cache metadata"""
|
||||
metadata_file = f"{cache_file}.meta"
|
||||
|
||||
metadata = {
|
||||
'type': cache_type,
|
||||
'created': datetime.now().isoformat(),
|
||||
'size': os.path.getsize(cache_file),
|
||||
'hash': self._get_file_hash(cache_file)
|
||||
}
|
||||
|
||||
import json
|
||||
with open(metadata_file, 'w') as f:
|
||||
json.dump(metadata, f)
|
||||
|
||||
def _is_cache_valid(self, cache_type: str, cache_file: str) -> bool:
|
||||
"""Check if cache is still valid"""
|
||||
metadata_file = f"{cache_file}.meta"
|
||||
|
||||
if not os.path.exists(metadata_file):
|
||||
return False
|
||||
|
||||
try:
|
||||
import json
|
||||
with open(metadata_file, 'r') as f:
|
||||
metadata = json.load(f)
|
||||
|
||||
# Check if file size matches
|
||||
if os.path.getsize(cache_file) != metadata.get('size', 0):
|
||||
return False
|
||||
|
||||
# Check if hash matches
|
||||
if self._get_file_hash(cache_file) != metadata.get('hash', ''):
|
||||
return False
|
||||
|
||||
# Check age for root cache
|
||||
if cache_type == 'root_cache':
|
||||
created = datetime.fromisoformat(metadata['created'])
|
||||
cutoff_time = datetime.now() - timedelta(days=self.config.root_cache_age)
|
||||
if created < cutoff_time:
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
def _get_file_hash(self, file_path: str) -> str:
|
||||
"""Get SHA256 hash of a file"""
|
||||
hash_sha256 = hashlib.sha256()
|
||||
with open(file_path, "rb") as f:
|
||||
for chunk in iter(lambda: f.read(4096), b""):
|
||||
hash_sha256.update(chunk)
|
||||
return hash_sha256.hexdigest()
|
||||
|
||||
def get_cache_stats(self) -> Dict[str, Any]:
|
||||
"""Get cache statistics"""
|
||||
stats = {}
|
||||
|
||||
# Root cache stats
|
||||
if self.config.use_root_cache:
|
||||
cache_file = f"{self.get_root_cache_path()}.tar.gz"
|
||||
if os.path.exists(cache_file):
|
||||
stats['root_cache'] = {
|
||||
'size': os.path.getsize(cache_file),
|
||||
'valid': self._is_cache_valid('root_cache', cache_file)
|
||||
}
|
||||
|
||||
# Package cache stats
|
||||
if self.config.use_package_cache:
|
||||
cache_path = self.get_package_cache_path()
|
||||
if os.path.exists(cache_path):
|
||||
packages = [f for f in os.listdir(cache_path) if f.endswith('.deb')]
|
||||
stats['package_cache'] = {
|
||||
'packages': len(packages),
|
||||
'size': sum(os.path.getsize(os.path.join(cache_path, p)) for p in packages)
|
||||
}
|
||||
|
||||
# ccache stats
|
||||
if self.config.use_ccache:
|
||||
ccache_path = self.get_ccache_path()
|
||||
if os.path.exists(ccache_path):
|
||||
try:
|
||||
import subprocess
|
||||
result = subprocess.run(['ccache', '-s'], cwd=ccache_path,
|
||||
capture_output=True, text=True)
|
||||
stats['ccache'] = {
|
||||
'output': result.stdout
|
||||
}
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return stats
|
||||
Loading…
Add table
Add a link
Reference in a new issue