488 lines
No EOL
17 KiB
Python
488 lines
No EOL
17 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Test script for enhanced configuration management
|
|
"""
|
|
|
|
import os
|
|
import tempfile
|
|
import yaml
|
|
import json
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
# Add the apt-ostree module to path
|
|
sys.path.insert(0, str(Path(__file__).parent / "src/apt-ostree.py/python"))
|
|
|
|
from utils.config import ConfigManager, ConfigValidator, ValidationError
|
|
|
|
def test_basic_configuration():
|
|
"""Test basic configuration loading and validation"""
|
|
|
|
print("=== Testing Basic Configuration ===\n")
|
|
|
|
# Create test configuration
|
|
config_manager = ConfigManager("/tmp/test-config.yaml")
|
|
|
|
# Test default configuration
|
|
config = config_manager.load_config()
|
|
if config:
|
|
print("✅ Default configuration loaded successfully")
|
|
print(f" D-Bus bus name: {config_manager.get('daemon.dbus.bus_name')}")
|
|
print(f" Log level: {config_manager.get('daemon.logging.level')}")
|
|
print(f" Max workers: {config_manager.get('daemon.concurrency.max_workers')}")
|
|
else:
|
|
print("❌ Failed to load default configuration")
|
|
return False
|
|
|
|
return True
|
|
|
|
def test_configuration_validation():
|
|
"""Test configuration validation"""
|
|
|
|
print("\n=== Testing Configuration Validation ===\n")
|
|
|
|
# Test valid configuration
|
|
valid_config = {
|
|
'daemon': {
|
|
'dbus': {
|
|
'bus_name': 'org.debian.aptostree1',
|
|
'object_path': '/org/debian/aptostree1'
|
|
},
|
|
'concurrency': {
|
|
'max_workers': 5,
|
|
'transaction_timeout': 600
|
|
},
|
|
'logging': {
|
|
'level': 'DEBUG',
|
|
'format': 'json',
|
|
'file': '/var/log/apt-ostree/test.log',
|
|
'max_size': '50MB',
|
|
'max_files': 3,
|
|
'rotation_strategy': 'size',
|
|
'rotation_interval': 1,
|
|
'rotation_unit': 'D',
|
|
'compression': True,
|
|
'correlation_id': True,
|
|
'performance_monitoring': True,
|
|
'cleanup_old_logs': True,
|
|
'cleanup_days': 7,
|
|
'include_hostname': True,
|
|
'include_version': True
|
|
},
|
|
'auto_update_policy': 'check'
|
|
},
|
|
'sysroot': {
|
|
'path': '/',
|
|
'repo_path': '/var/lib/ostree/repo'
|
|
},
|
|
'shell_integration': {
|
|
'script_path': '/usr/local/bin/apt-layer.sh',
|
|
'timeout': {
|
|
'install': 300,
|
|
'remove': 300,
|
|
'composefs': 600,
|
|
'dkms': 1800
|
|
}
|
|
},
|
|
'hardware_detection': {
|
|
'auto_configure': True,
|
|
'gpu_detection': True,
|
|
'cpu_detection': True,
|
|
'motherboard_detection': True
|
|
},
|
|
'dkms': {
|
|
'enabled': True,
|
|
'auto_rebuild': True,
|
|
'build_timeout': 3600,
|
|
'kernel_hooks': True
|
|
},
|
|
'security': {
|
|
'polkit_required': True,
|
|
'apparmor_profile': '/etc/apparmor.d/apt-ostree',
|
|
'selinux_context': 'system_u:system_r:apt_ostree_t:s0',
|
|
'privilege_separation': True
|
|
},
|
|
'performance': {
|
|
'cache_enabled': True,
|
|
'cache_ttl': 3600,
|
|
'parallel_operations': True
|
|
},
|
|
'experimental': {
|
|
'composefs': False,
|
|
'hardware_detection': False
|
|
}
|
|
}
|
|
|
|
validator = ConfigValidator()
|
|
is_valid = validator.validate_config(valid_config)
|
|
|
|
if is_valid:
|
|
print("✅ Valid configuration passed validation")
|
|
else:
|
|
print("❌ Valid configuration failed validation:")
|
|
print(validator.format_errors())
|
|
return False
|
|
|
|
# Test invalid configurations
|
|
print("\nTesting invalid configurations:")
|
|
|
|
# Test invalid log level
|
|
invalid_config = valid_config.copy()
|
|
invalid_config['daemon']['logging']['level'] = 'INVALID_LEVEL'
|
|
|
|
is_valid = validator.validate_config(invalid_config)
|
|
if not is_valid:
|
|
print("✅ Correctly rejected invalid log level")
|
|
else:
|
|
print("❌ Should have rejected invalid log level")
|
|
return False
|
|
|
|
# Test invalid max_workers
|
|
invalid_config = valid_config.copy()
|
|
invalid_config['daemon']['concurrency']['max_workers'] = 100 # Too high
|
|
|
|
is_valid = validator.validate_config(invalid_config)
|
|
if not is_valid:
|
|
print("✅ Correctly rejected invalid max_workers")
|
|
else:
|
|
print("❌ Should have rejected invalid max_workers")
|
|
return False
|
|
|
|
# Test invalid D-Bus bus name
|
|
invalid_config = valid_config.copy()
|
|
invalid_config['daemon']['dbus']['bus_name'] = 'invalid-bus-name'
|
|
|
|
is_valid = validator.validate_config(invalid_config)
|
|
if not is_valid:
|
|
print("✅ Correctly rejected invalid D-Bus bus name")
|
|
else:
|
|
print("❌ Should have rejected invalid D-Bus bus name")
|
|
return False
|
|
|
|
return True
|
|
|
|
def test_environment_variables():
|
|
"""Test environment variable integration"""
|
|
|
|
print("\n=== Testing Environment Variables ===\n")
|
|
|
|
# Set test environment variables (double underscore for nesting)
|
|
os.environ['APT_OSTREE_DAEMON__LOGGING__LEVEL'] = 'DEBUG'
|
|
os.environ['APT_OSTREE_DAEMON__CONCURRENCY__MAX_WORKERS'] = '8'
|
|
os.environ['APT_OSTREE_DAEMON__LOGGING__COMPRESSION'] = 'false'
|
|
os.environ['APT_OSTREE_PERFORMANCE__CACHE_TTL'] = '7200'
|
|
|
|
# Create config manager and load config
|
|
config_manager = ConfigManager("/tmp/test-env-config.yaml")
|
|
config = config_manager.load_config()
|
|
|
|
if config:
|
|
print("✅ Configuration loaded with environment variables")
|
|
print(f" Log level from env: {config_manager.get('daemon.logging.level')}")
|
|
print(f" Max workers from env: {config_manager.get('daemon.concurrency.max_workers')}")
|
|
print(f" Compression from env: {config_manager.get('daemon.logging.compression')}")
|
|
print(f" Cache TTL from env: {config_manager.get('performance.cache_ttl')}")
|
|
|
|
# Verify environment variables were applied
|
|
if (config_manager.get('daemon.logging.level') == 'DEBUG' and
|
|
config_manager.get('daemon.concurrency.max_workers') == 8 and
|
|
config_manager.get('daemon.logging.compression') == False and
|
|
config_manager.get('performance.cache_ttl') == 7200):
|
|
print("✅ Environment variables correctly applied")
|
|
else:
|
|
print("❌ Environment variables not applied correctly")
|
|
print(f" Expected: DEBUG, got: {config_manager.get('daemon.logging.level')}")
|
|
print(f" Expected: 8, got: {config_manager.get('daemon.concurrency.max_workers')}")
|
|
print(f" Expected: False, got: {config_manager.get('daemon.logging.compression')}")
|
|
print(f" Expected: 7200, got: {config_manager.get('performance.cache_ttl')}")
|
|
return False
|
|
else:
|
|
print("❌ Failed to load configuration with environment variables")
|
|
return False
|
|
|
|
# Clean up environment variables
|
|
for key in [
|
|
'APT_OSTREE_DAEMON__LOGGING__LEVEL',
|
|
'APT_OSTREE_DAEMON__CONCURRENCY__MAX_WORKERS',
|
|
'APT_OSTREE_DAEMON__LOGGING__COMPRESSION',
|
|
'APT_OSTREE_PERFORMANCE__CACHE_TTL']:
|
|
if key in os.environ:
|
|
del os.environ[key]
|
|
|
|
return True
|
|
|
|
def test_configuration_file_operations():
|
|
"""Test configuration file operations"""
|
|
|
|
print("\n=== Testing Configuration File Operations ===\n")
|
|
|
|
# Create temporary config file
|
|
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f:
|
|
test_config = {
|
|
'daemon': {
|
|
'logging': {
|
|
'level': 'WARNING',
|
|
'file': '/tmp/test.log'
|
|
},
|
|
'concurrency': {
|
|
'max_workers': 4
|
|
}
|
|
},
|
|
'experimental': {
|
|
'composefs': True
|
|
}
|
|
}
|
|
yaml.dump(test_config, f)
|
|
config_path = f.name
|
|
|
|
try:
|
|
# Load configuration from file
|
|
config_manager = ConfigManager(config_path)
|
|
config = config_manager.load_config()
|
|
|
|
if config:
|
|
print("✅ Configuration loaded from file")
|
|
print(f" Log level: {config_manager.get('daemon.logging.level')}")
|
|
print(f" Max workers: {config_manager.get('daemon.concurrency.max_workers')}")
|
|
print(f" ComposeFS experimental: {config_manager.get('experimental.composefs')}")
|
|
|
|
# Test that file values override defaults
|
|
if (config_manager.get('daemon.logging.level') == 'WARNING' and
|
|
config_manager.get('daemon.concurrency.max_workers') == 4 and
|
|
config_manager.get('experimental.composefs') == True):
|
|
print("✅ File values correctly override defaults")
|
|
else:
|
|
print("❌ File values not applied correctly")
|
|
print(f" Expected: WARNING, got: {config_manager.get('daemon.logging.level')}")
|
|
print(f" Expected: 4, got: {config_manager.get('daemon.concurrency.max_workers')}")
|
|
print(f" Expected: True, got: {config_manager.get('experimental.composefs')}")
|
|
return False
|
|
else:
|
|
print("❌ Failed to load configuration from file")
|
|
return False
|
|
|
|
# Test saving configuration
|
|
config_manager.set('daemon.logging.level', 'ERROR')
|
|
config_manager.set('performance.cache_enabled', False)
|
|
|
|
if config_manager.save():
|
|
print("✅ Configuration saved successfully")
|
|
|
|
# Reload and verify changes
|
|
new_config_manager = ConfigManager(config_path)
|
|
new_config = new_config_manager.load_config()
|
|
|
|
if (new_config_manager.get('daemon.logging.level') == 'ERROR' and
|
|
new_config_manager.get('performance.cache_enabled') == False):
|
|
print("✅ Configuration changes persisted correctly")
|
|
else:
|
|
print("❌ Configuration changes not persisted")
|
|
print(f" Expected: ERROR, got: {new_config_manager.get('daemon.logging.level')}")
|
|
print(f" Expected: False, got: {new_config_manager.get('performance.cache_enabled')}")
|
|
return False
|
|
else:
|
|
print("❌ Failed to save configuration")
|
|
return False
|
|
|
|
finally:
|
|
# Clean up
|
|
if os.path.exists(config_path):
|
|
os.unlink(config_path)
|
|
|
|
return True
|
|
|
|
def test_schema_export():
|
|
"""Test schema export functionality"""
|
|
|
|
print("\n=== Testing Schema Export ===\n")
|
|
|
|
config_manager = ConfigManager()
|
|
|
|
# Export schema to temporary file
|
|
with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f:
|
|
schema_path = f.name
|
|
|
|
try:
|
|
if config_manager.export_schema(schema_path):
|
|
print("✅ Schema exported successfully")
|
|
|
|
# Read and validate schema
|
|
with open(schema_path, 'r') as f:
|
|
schema = json.load(f)
|
|
|
|
# Check schema structure
|
|
if ('type' in schema and
|
|
'properties' in schema and
|
|
'daemon' in schema['properties']):
|
|
print("✅ Schema structure is valid")
|
|
print(f" Schema has {len(schema['properties'])} top-level properties")
|
|
|
|
# Check some specific properties
|
|
daemon_props = schema['properties']['daemon']['properties']
|
|
if 'logging' in daemon_props and 'dbus' in daemon_props:
|
|
print("✅ Schema includes expected nested properties")
|
|
else:
|
|
print("❌ Schema missing expected properties")
|
|
return False
|
|
else:
|
|
print("❌ Schema structure is invalid")
|
|
return False
|
|
else:
|
|
print("❌ Failed to export schema")
|
|
return False
|
|
|
|
finally:
|
|
# Clean up
|
|
if os.path.exists(schema_path):
|
|
os.unlink(schema_path)
|
|
|
|
return True
|
|
|
|
def test_validation_errors():
|
|
"""Test detailed validation error reporting"""
|
|
|
|
print("\n=== Testing Validation Error Reporting ===\n")
|
|
|
|
# Create configuration with multiple errors
|
|
invalid_config = {
|
|
'daemon': {
|
|
'dbus': {
|
|
'bus_name': 'invalid-bus-name', # Invalid pattern
|
|
'object_path': '/invalid/path' # Invalid pattern
|
|
},
|
|
'concurrency': {
|
|
'max_workers': 100, # Too high
|
|
'transaction_timeout': 10 # Too low
|
|
},
|
|
'logging': {
|
|
'level': 'INVALID_LEVEL', # Invalid enum
|
|
'max_size': 'invalid-size', # Invalid pattern
|
|
'max_files': -1 # Too low
|
|
}
|
|
}
|
|
}
|
|
|
|
validator = ConfigValidator()
|
|
is_valid = validator.validate_config(invalid_config)
|
|
|
|
if not is_valid:
|
|
print("✅ Correctly detected validation errors")
|
|
errors = validator.get_errors()
|
|
print(f" Found {len(errors)} validation errors:")
|
|
|
|
for error in errors:
|
|
print(f" {error.field}: {error.message}")
|
|
if error.value is not None:
|
|
print(f" Value: {error.value}")
|
|
|
|
# Check specific errors
|
|
error_fields = [error.field for error in errors]
|
|
expected_errors = [
|
|
'daemon.dbus.bus_name',
|
|
'daemon.concurrency.max_workers',
|
|
'daemon.concurrency.transaction_timeout',
|
|
'daemon.logging.level',
|
|
'daemon.logging.max_size',
|
|
'daemon.logging.max_files'
|
|
]
|
|
|
|
for expected in expected_errors:
|
|
if expected in error_fields:
|
|
print(f" ✅ Expected error found: {expected}")
|
|
else:
|
|
print(f" ❌ Missing expected error: {expected}")
|
|
return False
|
|
else:
|
|
print("❌ Should have detected validation errors")
|
|
return False
|
|
|
|
return True
|
|
|
|
def test_configuration_getters():
|
|
"""Test configuration getter methods"""
|
|
|
|
print("\n=== Testing Configuration Getters ===\n")
|
|
|
|
config_manager = ConfigManager()
|
|
config = config_manager.load_config()
|
|
|
|
if not config:
|
|
print("❌ Failed to load configuration")
|
|
return False
|
|
|
|
# Test D-Bus config getter
|
|
dbus_config = config_manager.get_dbus_config()
|
|
if 'bus_name' in dbus_config and 'object_path' in dbus_config:
|
|
print("✅ D-Bus config getter works")
|
|
else:
|
|
print("❌ D-Bus config getter failed")
|
|
return False
|
|
|
|
# Test logging config getter
|
|
logging_config = config_manager.get_logging_config()
|
|
if 'level' in logging_config and 'format' in logging_config:
|
|
print("✅ Logging config getter works")
|
|
else:
|
|
print("❌ Logging config getter failed")
|
|
return False
|
|
|
|
# Test concurrency config getter
|
|
concurrency_config = config_manager.get_concurrency_config()
|
|
if 'max_workers' in concurrency_config and 'transaction_timeout' in concurrency_config:
|
|
print("✅ Concurrency config getter works")
|
|
else:
|
|
print("❌ Concurrency config getter failed")
|
|
return False
|
|
|
|
# Test sysroot config getter
|
|
sysroot_config = config_manager.get_sysroot_config()
|
|
if 'path' in sysroot_config and 'repo_path' in sysroot_config:
|
|
print("✅ Sysroot config getter works")
|
|
else:
|
|
print("❌ Sysroot config getter failed")
|
|
return False
|
|
|
|
return True
|
|
|
|
def main():
|
|
"""Run all configuration tests"""
|
|
|
|
print("🧪 Enhanced Configuration Management Test Suite")
|
|
print("=" * 50)
|
|
|
|
tests = [
|
|
("Basic Configuration", test_basic_configuration),
|
|
("Configuration Validation", test_configuration_validation),
|
|
("Environment Variables", test_environment_variables),
|
|
("File Operations", test_configuration_file_operations),
|
|
("Schema Export", test_schema_export),
|
|
("Validation Errors", test_validation_errors),
|
|
("Configuration Getters", test_configuration_getters)
|
|
]
|
|
|
|
passed = 0
|
|
total = len(tests)
|
|
|
|
for test_name, test_func in tests:
|
|
try:
|
|
if test_func():
|
|
passed += 1
|
|
print(f"✅ {test_name}: PASSED")
|
|
else:
|
|
print(f"❌ {test_name}: FAILED")
|
|
except Exception as e:
|
|
print(f"❌ {test_name}: ERROR - {e}")
|
|
|
|
print("\n" + "=" * 50)
|
|
print(f"Test Results: {passed}/{total} tests passed")
|
|
|
|
if passed == total:
|
|
print("🎉 All configuration tests passed!")
|
|
return True
|
|
else:
|
|
print("⚠️ Some configuration tests failed")
|
|
return False
|
|
|
|
if __name__ == "__main__":
|
|
success = main()
|
|
sys.exit(0 if success else 1) |