deb-mock/deb_mock/plugins/compress_logs.py
robojerk 5e7f4b0562
Some checks failed
Build Deb-Mock Package / build (push) Successful in 55s
Lint Code / Lint All Code (push) Failing after 3s
Test Deb-Mock Build / test (push) Failing after 53s
Fix sbuild integration and clean up codebase
- Fix environment variable handling in sbuild wrapper
- Remove unsupported --log-dir and --env options from sbuild command
- Clean up unused imports and fix linting issues
- Organize examples directory with official Debian hello package
- Fix YAML formatting (trailing spaces, newlines)
- Remove placeholder example files
- All tests passing (30/30)
- Successfully tested build with official Debian hello package
2025-08-04 04:34:32 +00:00

309 lines
11 KiB
Python

"""
CompressLogs Plugin for Deb-Mock
This plugin compresses build logs to save disk space,
inspired by Fedora's Mock compress_logs plugin but adapted for Debian-based systems.
"""
import logging
import os
import subprocess
from pathlib import Path
from typing import Any, Dict, List
from .base import BasePlugin
logger = logging.getLogger(__name__)
class CompressLogsPlugin(BasePlugin):
"""
Compress build logs to save disk space.
This plugin automatically compresses build logs after build completion,
which is useful for CI/CD environments and long-term log storage.
"""
def __init__(self, config, hook_manager):
"""Initialize the CompressLogs plugin."""
super().__init__(config, hook_manager)
self.compression = self._get_compression_settings()
self._log_info(f"Initialized with compression: {self.compression['method']}")
def _register_hooks(self):
"""Register log compression hooks."""
self.hook_manager.add_hook("process_logs", self.process_logs)
self._log_debug("Registered process_logs hook")
def _get_compression_settings(self) -> Dict[str, Any]:
"""
Get compression settings from configuration.
Returns:
Dictionary with compression settings
"""
plugin_config = self._get_plugin_config()
return {
"method": plugin_config.get("compression", "gzip"),
"level": plugin_config.get("level", 9),
"extensions": plugin_config.get("extensions", [".log"]),
"exclude_patterns": plugin_config.get("exclude_patterns", []),
"min_size": plugin_config.get("min_size", 0), # Minimum file size to compress
"command": plugin_config.get("command", None), # Custom compression command
}
def process_logs(self, context: Dict[str, Any]) -> None:
"""
Compress build logs after build completion.
Args:
context: Context dictionary with log information
"""
if not self.enabled:
return
log_dir = context.get("log_dir")
if not log_dir:
self._log_warning("No log_dir in context, skipping log compression")
return
if not os.path.exists(log_dir):
self._log_warning(f"Log directory does not exist: {log_dir}")
return
self._log_info(f"Compressing logs in {log_dir}")
compressed_count = 0
total_size_saved = 0
for log_file in self._find_log_files(log_dir):
try:
original_size = os.path.getsize(log_file)
# Check minimum size requirement
if original_size < self.compression["min_size"]:
self._log_debug(f"Skipping {log_file} (size {original_size} < {self.compression['min_size']})")
continue
# Check if already compressed
if self._is_already_compressed(log_file):
self._log_debug(f"Skipping already compressed file: {log_file}")
continue
# Compress the file
compressed_size = self._compress_file(log_file)
if compressed_size is not None:
compressed_count += 1
size_saved = original_size - compressed_size
total_size_saved += size_saved
self._log_debug(
f"Compressed {log_file}: {original_size} -> {compressed_size} bytes (saved {size_saved})"
)
except Exception as e:
self._log_error(f"Failed to compress {log_file}: {e}")
self._log_info(f"Compressed {compressed_count} files, saved {total_size_saved} bytes")
def _find_log_files(self, log_dir: str) -> List[str]:
"""
Find log files to compress.
Args:
log_dir: Directory containing log files
Returns:
List of log file paths
"""
log_files = []
for extension in self.compression["extensions"]:
pattern = f"*{extension}"
log_files.extend(Path(log_dir).glob(pattern))
# Filter out excluded patterns
filtered_files = []
for log_file in log_files:
if not self._is_excluded(log_file.name):
filtered_files.append(str(log_file))
return filtered_files
def _is_excluded(self, filename: str) -> bool:
"""
Check if file should be excluded from compression.
Args:
filename: Name of the file to check
Returns:
True if file should be excluded, False otherwise
"""
for pattern in self.compression["exclude_patterns"]:
if pattern in filename:
return True
return False
def _is_already_compressed(self, file_path: str) -> bool:
"""
Check if file is already compressed.
Args:
file_path: Path to the file to check
Returns:
True if file is already compressed, False otherwise
"""
compressed_extensions = [".gz", ".bz2", ".xz", ".lzma", ".zst"]
return any(file_path.endswith(ext) for ext in compressed_extensions)
def _compress_file(self, file_path: str) -> int:
"""
Compress a single file.
Args:
file_path: Path to the file to compress
Returns:
Size of the compressed file, or None if compression failed
"""
method = self.compression["method"]
level = self.compression["level"]
# Use custom command if specified
if self.compression["command"]:
return self._compress_with_custom_command(file_path)
# Use standard compression methods
if method == "gzip":
return self._compress_gzip(file_path, level)
elif method == "bzip2":
return self._compress_bzip2(file_path, level)
elif method == "xz":
return self._compress_xz(file_path, level)
elif method == "zstd":
return self._compress_zstd(file_path, level)
else:
self._log_error(f"Unsupported compression method: {method}")
return None
def _compress_gzip(self, file_path: str, level: int) -> int:
"""Compress file using gzip."""
try:
cmd = ["gzip", f"-{level}", file_path]
subprocess.run(cmd, capture_output=True, text=True, check=True)
compressed_path = f"{file_path}.gz"
return os.path.getsize(compressed_path) if os.path.exists(compressed_path) else None
except subprocess.CalledProcessError as e:
self._log_error(f"gzip compression failed: {e.stderr}")
return None
def _compress_bzip2(self, file_path: str, level: int) -> int:
"""Compress file using bzip2."""
try:
cmd = ["bzip2", f"-{level}", file_path]
subprocess.run(cmd, capture_output=True, text=True, check=True)
compressed_path = f"{file_path}.bz2"
return os.path.getsize(compressed_path) if os.path.exists(compressed_path) else None
except subprocess.CalledProcessError as e:
self._log_error(f"bzip2 compression failed: {e.stderr}")
return None
def _compress_xz(self, file_path: str, level: int) -> int:
"""Compress file using xz."""
try:
cmd = ["xz", f"-{level}", file_path]
subprocess.run(cmd, capture_output=True, text=True, check=True)
compressed_path = f"{file_path}.xz"
return os.path.getsize(compressed_path) if os.path.exists(compressed_path) else None
except subprocess.CalledProcessError as e:
self._log_error(f"xz compression failed: {e.stderr}")
return None
def _compress_zstd(self, file_path: str, level: int) -> int:
"""Compress file using zstd."""
try:
cmd = ["zstd", f"-{level}", file_path]
subprocess.run(cmd, capture_output=True, text=True, check=True)
compressed_path = f"{file_path}.zst"
return os.path.getsize(compressed_path) if os.path.exists(compressed_path) else None
except subprocess.CalledProcessError as e:
self._log_error(f"zstd compression failed: {e.stderr}")
return None
def _compress_with_custom_command(self, file_path: str) -> int:
"""Compress file using custom command."""
try:
command = self.compression["command"].format(file=file_path)
subprocess.run(command, shell=True, capture_output=True, text=True, check=True)
# Try to determine compressed file size
# This is a best-effort approach since custom commands may vary
for ext in [".gz", ".bz2", ".xz", ".zst", ".lzma"]:
compressed_path = f"{file_path}{ext}"
if os.path.exists(compressed_path):
return os.path.getsize(compressed_path)
return None
except subprocess.CalledProcessError as e:
self._log_error(f"Custom compression command failed: {e.stderr}")
return None
def validate_config(self, config: Any) -> bool:
"""
Validate plugin configuration.
Args:
config: Configuration to validate
Returns:
True if configuration is valid, False otherwise
"""
plugin_config = getattr(config, "plugins", {}).get("compress_logs", {})
# Validate compression method
valid_methods = ["gzip", "bzip2", "xz", "zstd"]
method = plugin_config.get("compression", "gzip")
if method not in valid_methods and not plugin_config.get("command"):
self._log_error(f"Invalid compression method: {method}. Valid methods: {valid_methods}")
return False
# Validate compression level
level = plugin_config.get("level", 9)
if not isinstance(level, int) or level < 1 or level > 9:
self._log_error(f"Invalid compression level: {level}. Must be 1-9")
return False
# Validate extensions
extensions = plugin_config.get("extensions", [".log"])
if not isinstance(extensions, list):
self._log_error("Extensions must be a list")
return False
# Validate min_size
min_size = plugin_config.get("min_size", 0)
if not isinstance(min_size, int) or min_size < 0:
self._log_error(f"Invalid min_size: {min_size}. Must be non-negative integer")
return False
return True
def get_plugin_info(self) -> Dict[str, Any]:
"""
Get plugin information.
Returns:
Dictionary with plugin information
"""
info = super().get_plugin_info()
info.update(
{
"compression_method": self.compression["method"],
"compression_level": self.compression["level"],
"extensions": self.compression["extensions"],
"min_size": self.compression["min_size"],
"hooks": ["process_logs"],
}
)
return info