""" CompressLogs Plugin for Deb-Mock This plugin compresses build logs to save disk space, inspired by Fedora's Mock compress_logs plugin but adapted for Debian-based systems. """ import os import subprocess import logging from pathlib import Path from typing import Dict, Any, List from .base import BasePlugin logger = logging.getLogger(__name__) class CompressLogsPlugin(BasePlugin): """ Compress build logs to save disk space. This plugin automatically compresses build logs after build completion, which is useful for CI/CD environments and long-term log storage. """ def __init__(self, config, hook_manager): """Initialize the CompressLogs plugin.""" super().__init__(config, hook_manager) self.compression = self._get_compression_settings() self._log_info(f"Initialized with compression: {self.compression['method']}") def _register_hooks(self): """Register log compression hooks.""" self.hook_manager.add_hook("process_logs", self.process_logs) self._log_debug("Registered process_logs hook") def _get_compression_settings(self) -> Dict[str, Any]: """ Get compression settings from configuration. Returns: Dictionary with compression settings """ plugin_config = self._get_plugin_config() return { 'method': plugin_config.get('compression', 'gzip'), 'level': plugin_config.get('level', 9), 'extensions': plugin_config.get('extensions', ['.log']), 'exclude_patterns': plugin_config.get('exclude_patterns', []), 'min_size': plugin_config.get('min_size', 0), # Minimum file size to compress 'command': plugin_config.get('command', None) # Custom compression command } def process_logs(self, context: Dict[str, Any]) -> None: """ Compress build logs after build completion. Args: context: Context dictionary with log information """ if not self.enabled: return log_dir = context.get('log_dir') if not log_dir: self._log_warning("No log_dir in context, skipping log compression") return if not os.path.exists(log_dir): self._log_warning(f"Log directory does not exist: {log_dir}") return self._log_info(f"Compressing logs in {log_dir}") compressed_count = 0 total_size_saved = 0 for log_file in self._find_log_files(log_dir): try: original_size = os.path.getsize(log_file) # Check minimum size requirement if original_size < self.compression['min_size']: self._log_debug(f"Skipping {log_file} (size {original_size} < {self.compression['min_size']})") continue # Check if already compressed if self._is_already_compressed(log_file): self._log_debug(f"Skipping already compressed file: {log_file}") continue # Compress the file compressed_size = self._compress_file(log_file) if compressed_size is not None: compressed_count += 1 size_saved = original_size - compressed_size total_size_saved += size_saved self._log_debug(f"Compressed {log_file}: {original_size} -> {compressed_size} bytes (saved {size_saved})") except Exception as e: self._log_error(f"Failed to compress {log_file}: {e}") self._log_info(f"Compressed {compressed_count} files, saved {total_size_saved} bytes") def _find_log_files(self, log_dir: str) -> List[str]: """ Find log files to compress. Args: log_dir: Directory containing log files Returns: List of log file paths """ log_files = [] for extension in self.compression['extensions']: pattern = f"*{extension}" log_files.extend(Path(log_dir).glob(pattern)) # Filter out excluded patterns filtered_files = [] for log_file in log_files: if not self._is_excluded(log_file.name): filtered_files.append(str(log_file)) return filtered_files def _is_excluded(self, filename: str) -> bool: """ Check if file should be excluded from compression. Args: filename: Name of the file to check Returns: True if file should be excluded, False otherwise """ for pattern in self.compression['exclude_patterns']: if pattern in filename: return True return False def _is_already_compressed(self, file_path: str) -> bool: """ Check if file is already compressed. Args: file_path: Path to the file to check Returns: True if file is already compressed, False otherwise """ compressed_extensions = ['.gz', '.bz2', '.xz', '.lzma', '.zst'] return any(file_path.endswith(ext) for ext in compressed_extensions) def _compress_file(self, file_path: str) -> int: """ Compress a single file. Args: file_path: Path to the file to compress Returns: Size of the compressed file, or None if compression failed """ method = self.compression['method'] level = self.compression['level'] # Use custom command if specified if self.compression['command']: return self._compress_with_custom_command(file_path) # Use standard compression methods if method == 'gzip': return self._compress_gzip(file_path, level) elif method == 'bzip2': return self._compress_bzip2(file_path, level) elif method == 'xz': return self._compress_xz(file_path, level) elif method == 'zstd': return self._compress_zstd(file_path, level) else: self._log_error(f"Unsupported compression method: {method}") return None def _compress_gzip(self, file_path: str, level: int) -> int: """Compress file using gzip.""" try: cmd = ['gzip', f'-{level}', file_path] result = subprocess.run(cmd, capture_output=True, text=True, check=True) compressed_path = f"{file_path}.gz" return os.path.getsize(compressed_path) if os.path.exists(compressed_path) else None except subprocess.CalledProcessError as e: self._log_error(f"gzip compression failed: {e.stderr}") return None def _compress_bzip2(self, file_path: str, level: int) -> int: """Compress file using bzip2.""" try: cmd = ['bzip2', f'-{level}', file_path] result = subprocess.run(cmd, capture_output=True, text=True, check=True) compressed_path = f"{file_path}.bz2" return os.path.getsize(compressed_path) if os.path.exists(compressed_path) else None except subprocess.CalledProcessError as e: self._log_error(f"bzip2 compression failed: {e.stderr}") return None def _compress_xz(self, file_path: str, level: int) -> int: """Compress file using xz.""" try: cmd = ['xz', f'-{level}', file_path] result = subprocess.run(cmd, capture_output=True, text=True, check=True) compressed_path = f"{file_path}.xz" return os.path.getsize(compressed_path) if os.path.exists(compressed_path) else None except subprocess.CalledProcessError as e: self._log_error(f"xz compression failed: {e.stderr}") return None def _compress_zstd(self, file_path: str, level: int) -> int: """Compress file using zstd.""" try: cmd = ['zstd', f'-{level}', file_path] result = subprocess.run(cmd, capture_output=True, text=True, check=True) compressed_path = f"{file_path}.zst" return os.path.getsize(compressed_path) if os.path.exists(compressed_path) else None except subprocess.CalledProcessError as e: self._log_error(f"zstd compression failed: {e.stderr}") return None def _compress_with_custom_command(self, file_path: str) -> int: """Compress file using custom command.""" try: command = self.compression['command'].format(file=file_path) result = subprocess.run(command, shell=True, capture_output=True, text=True, check=True) # Try to determine compressed file size # This is a best-effort approach since custom commands may vary for ext in ['.gz', '.bz2', '.xz', '.zst', '.lzma']: compressed_path = f"{file_path}{ext}" if os.path.exists(compressed_path): return os.path.getsize(compressed_path) return None except subprocess.CalledProcessError as e: self._log_error(f"Custom compression command failed: {e.stderr}") return None def validate_config(self, config: Any) -> bool: """ Validate plugin configuration. Args: config: Configuration to validate Returns: True if configuration is valid, False otherwise """ plugin_config = getattr(config, 'plugins', {}).get('compress_logs', {}) # Validate compression method valid_methods = ['gzip', 'bzip2', 'xz', 'zstd'] method = plugin_config.get('compression', 'gzip') if method not in valid_methods and not plugin_config.get('command'): self._log_error(f"Invalid compression method: {method}. Valid methods: {valid_methods}") return False # Validate compression level level = plugin_config.get('level', 9) if not isinstance(level, int) or level < 1 or level > 9: self._log_error(f"Invalid compression level: {level}. Must be 1-9") return False # Validate extensions extensions = plugin_config.get('extensions', ['.log']) if not isinstance(extensions, list): self._log_error("Extensions must be a list") return False # Validate min_size min_size = plugin_config.get('min_size', 0) if not isinstance(min_size, int) or min_size < 0: self._log_error(f"Invalid min_size: {min_size}. Must be non-negative integer") return False return True def get_plugin_info(self) -> Dict[str, Any]: """ Get plugin information. Returns: Dictionary with plugin information """ info = super().get_plugin_info() info.update({ 'compression_method': self.compression['method'], 'compression_level': self.compression['level'], 'extensions': self.compression['extensions'], 'min_size': self.compression['min_size'], 'hooks': ['process_logs'] }) return info