""" CompressLogs Plugin for Deb-Mock This plugin compresses build logs to save disk space, inspired by Fedora's Mock compress_logs plugin but adapted for Debian-based systems. """ import logging import os import subprocess from pathlib import Path from typing import Any, Dict, List from .base import BasePlugin logger = logging.getLogger(__name__) class CompressLogsPlugin(BasePlugin): """ Compress build logs to save disk space. This plugin automatically compresses build logs after build completion, which is useful for CI/CD environments and long-term log storage. """ def __init__(self, config, hook_manager): """Initialize the CompressLogs plugin.""" super().__init__(config, hook_manager) self.compression = self._get_compression_settings() self._log_info(f"Initialized with compression: {self.compression['method']}") def _register_hooks(self): """Register log compression hooks.""" self.hook_manager.add_hook("process_logs", self.process_logs) self._log_debug("Registered process_logs hook") def _get_compression_settings(self) -> Dict[str, Any]: """ Get compression settings from configuration. Returns: Dictionary with compression settings """ plugin_config = self._get_plugin_config() return { "method": plugin_config.get("compression", "gzip"), "level": plugin_config.get("level", 9), "extensions": plugin_config.get("extensions", [".log"]), "exclude_patterns": plugin_config.get("exclude_patterns", []), "min_size": plugin_config.get("min_size", 0), # Minimum file size to compress "command": plugin_config.get("command", None), # Custom compression command } def process_logs(self, context: Dict[str, Any]) -> None: """ Compress build logs after build completion. Args: context: Context dictionary with log information """ if not self.enabled: return log_dir = context.get("log_dir") if not log_dir: self._log_warning("No log_dir in context, skipping log compression") return if not os.path.exists(log_dir): self._log_warning(f"Log directory does not exist: {log_dir}") return self._log_info(f"Compressing logs in {log_dir}") compressed_count = 0 total_size_saved = 0 for log_file in self._find_log_files(log_dir): try: original_size = os.path.getsize(log_file) # Check minimum size requirement if original_size < self.compression["min_size"]: self._log_debug(f"Skipping {log_file} (size {original_size} < {self.compression['min_size']})") continue # Check if already compressed if self._is_already_compressed(log_file): self._log_debug(f"Skipping already compressed file: {log_file}") continue # Compress the file compressed_size = self._compress_file(log_file) if compressed_size is not None: compressed_count += 1 size_saved = original_size - compressed_size total_size_saved += size_saved self._log_debug( f"Compressed {log_file}: {original_size} -> {compressed_size} bytes (saved {size_saved})" ) except Exception as e: self._log_error(f"Failed to compress {log_file}: {e}") self._log_info(f"Compressed {compressed_count} files, saved {total_size_saved} bytes") def _find_log_files(self, log_dir: str) -> List[str]: """ Find log files to compress. Args: log_dir: Directory containing log files Returns: List of log file paths """ log_files = [] for extension in self.compression["extensions"]: pattern = f"*{extension}" log_files.extend(Path(log_dir).glob(pattern)) # Filter out excluded patterns filtered_files = [] for log_file in log_files: if not self._is_excluded(log_file.name): filtered_files.append(str(log_file)) return filtered_files def _is_excluded(self, filename: str) -> bool: """ Check if file should be excluded from compression. Args: filename: Name of the file to check Returns: True if file should be excluded, False otherwise """ for pattern in self.compression["exclude_patterns"]: if pattern in filename: return True return False def _is_already_compressed(self, file_path: str) -> bool: """ Check if file is already compressed. Args: file_path: Path to the file to check Returns: True if file is already compressed, False otherwise """ compressed_extensions = [".gz", ".bz2", ".xz", ".lzma", ".zst"] return any(file_path.endswith(ext) for ext in compressed_extensions) def _compress_file(self, file_path: str) -> int: """ Compress a single file. Args: file_path: Path to the file to compress Returns: Size of the compressed file, or None if compression failed """ method = self.compression["method"] level = self.compression["level"] # Use custom command if specified if self.compression["command"]: return self._compress_with_custom_command(file_path) # Use standard compression methods if method == "gzip": return self._compress_gzip(file_path, level) elif method == "bzip2": return self._compress_bzip2(file_path, level) elif method == "xz": return self._compress_xz(file_path, level) elif method == "zstd": return self._compress_zstd(file_path, level) else: self._log_error(f"Unsupported compression method: {method}") return None def _compress_gzip(self, file_path: str, level: int) -> int: """Compress file using gzip.""" try: cmd = ["gzip", f"-{level}", file_path] subprocess.run(cmd, capture_output=True, text=True, check=True) compressed_path = f"{file_path}.gz" return os.path.getsize(compressed_path) if os.path.exists(compressed_path) else None except subprocess.CalledProcessError as e: self._log_error(f"gzip compression failed: {e.stderr}") return None def _compress_bzip2(self, file_path: str, level: int) -> int: """Compress file using bzip2.""" try: cmd = ["bzip2", f"-{level}", file_path] subprocess.run(cmd, capture_output=True, text=True, check=True) compressed_path = f"{file_path}.bz2" return os.path.getsize(compressed_path) if os.path.exists(compressed_path) else None except subprocess.CalledProcessError as e: self._log_error(f"bzip2 compression failed: {e.stderr}") return None def _compress_xz(self, file_path: str, level: int) -> int: """Compress file using xz.""" try: cmd = ["xz", f"-{level}", file_path] subprocess.run(cmd, capture_output=True, text=True, check=True) compressed_path = f"{file_path}.xz" return os.path.getsize(compressed_path) if os.path.exists(compressed_path) else None except subprocess.CalledProcessError as e: self._log_error(f"xz compression failed: {e.stderr}") return None def _compress_zstd(self, file_path: str, level: int) -> int: """Compress file using zstd.""" try: cmd = ["zstd", f"-{level}", file_path] subprocess.run(cmd, capture_output=True, text=True, check=True) compressed_path = f"{file_path}.zst" return os.path.getsize(compressed_path) if os.path.exists(compressed_path) else None except subprocess.CalledProcessError as e: self._log_error(f"zstd compression failed: {e.stderr}") return None def _compress_with_custom_command(self, file_path: str) -> int: """Compress file using custom command.""" try: command = self.compression["command"].format(file=file_path) subprocess.run(command, shell=True, capture_output=True, text=True, check=True) # Try to determine compressed file size # This is a best-effort approach since custom commands may vary for ext in [".gz", ".bz2", ".xz", ".zst", ".lzma"]: compressed_path = f"{file_path}{ext}" if os.path.exists(compressed_path): return os.path.getsize(compressed_path) return None except subprocess.CalledProcessError as e: self._log_error(f"Custom compression command failed: {e.stderr}") return None def validate_config(self, config: Any) -> bool: """ Validate plugin configuration. Args: config: Configuration to validate Returns: True if configuration is valid, False otherwise """ plugin_config = getattr(config, "plugins", {}).get("compress_logs", {}) # Validate compression method valid_methods = ["gzip", "bzip2", "xz", "zstd"] method = plugin_config.get("compression", "gzip") if method not in valid_methods and not plugin_config.get("command"): self._log_error(f"Invalid compression method: {method}. Valid methods: {valid_methods}") return False # Validate compression level level = plugin_config.get("level", 9) if not isinstance(level, int) or level < 1 or level > 9: self._log_error(f"Invalid compression level: {level}. Must be 1-9") return False # Validate extensions extensions = plugin_config.get("extensions", [".log"]) if not isinstance(extensions, list): self._log_error("Extensions must be a list") return False # Validate min_size min_size = plugin_config.get("min_size", 0) if not isinstance(min_size, int) or min_size < 0: self._log_error(f"Invalid min_size: {min_size}. Must be non-negative integer") return False return True def get_plugin_info(self) -> Dict[str, Any]: """ Get plugin information. Returns: Dictionary with plugin information """ info = super().get_plugin_info() info.update( { "compression_method": self.compression["method"], "compression_level": self.compression["level"], "extensions": self.compression["extensions"], "min_size": self.compression["min_size"], "hooks": ["process_logs"], } ) return info