debian-koji/security_hardening.py
2025-08-26 11:48:31 -07:00

681 lines
27 KiB
Python

#!/usr/bin/env python3
"""
Debian Forge Security Hardening Module
This module provides security testing, hardening, and monitoring capabilities
for the Debian Forge system.
"""
import hashlib
import json
import os
import re
import sqlite3
import time
from typing import Dict, List, Optional, Any, Tuple
from dataclasses import dataclass
from pathlib import Path
import secrets
@dataclass
class SecurityVulnerability:
"""Security vulnerability information"""
severity: str # "critical", "high", "medium", "low"
category: str # "authentication", "authorization", "input_validation", "data_protection"
description: str
cve_id: Optional[str] = None
affected_component: str = "unknown"
remediation: str = ""
@dataclass
class SecurityTestResult:
"""Result of a security test"""
test_name: str
passed: bool
details: str
vulnerabilities: List[SecurityVulnerability] = None
recommendations: List[str] = None
class SecurityHardening:
"""Security hardening and testing for Debian Forge"""
def __init__(self, config_file: str = "security_config.json"):
self.config_file = config_file
self.security_config = self._load_security_config()
self.vulnerabilities_db = "security_vulnerabilities.db"
self._init_vulnerability_db()
def _load_security_config(self) -> Dict[str, Any]:
"""Load security configuration"""
default_config = {
"password_policy": {
"min_length": 12,
"require_uppercase": True,
"require_lowercase": True,
"require_numbers": True,
"require_special": True,
"max_age_days": 90
},
"session_policy": {
"timeout_minutes": 30,
"max_failed_attempts": 5,
"lockout_duration_minutes": 15
},
"input_validation": {
"max_input_length": 1000,
"allowed_file_types": [".json", ".yaml", ".yml", ".txt"],
"blocked_patterns": ["<script>", "javascript:", "data:text/html"]
},
"encryption": {
"hash_algorithm": "sha256",
"salt_length": 32,
"key_derivation_rounds": 100000
}
}
if os.path.exists(self.config_file):
try:
with open(self.config_file, 'r') as f:
user_config = json.load(f)
# Merge user config with defaults
for key, value in user_config.items():
if key in default_config:
default_config[key].update(value)
else:
default_config[key] = value
except Exception as e:
print(f"Warning: Could not load security config: {e}")
return default_config
def _init_vulnerability_db(self):
"""Initialize vulnerability database"""
conn = sqlite3.connect(self.vulnerabilities_db)
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS vulnerabilities (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
severity TEXT NOT NULL,
category TEXT NOT NULL,
description TEXT NOT NULL,
cve_id TEXT,
affected_component TEXT NOT NULL,
remediation TEXT,
status TEXT DEFAULT 'open'
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS security_tests (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
test_name TEXT NOT NULL,
passed BOOLEAN NOT NULL,
details TEXT,
vulnerabilities_count INTEGER DEFAULT 0
)
""")
conn.commit()
conn.close()
def test_authentication_security(self) -> SecurityTestResult:
"""Test authentication security"""
vulnerabilities = []
recommendations = []
# Test password policy
password_config = self.security_config["password_policy"]
if password_config["min_length"] < 12:
vulnerabilities.append(SecurityVulnerability(
severity="medium",
category="authentication",
description="Password minimum length is less than 12 characters",
affected_component="password_policy",
remediation="Increase minimum password length to 12 characters"
))
recommendations.append("Increase minimum password length to 12 characters")
if not password_config["require_special"]:
vulnerabilities.append(SecurityVulnerability(
severity="medium",
category="authentication",
description="Special characters not required in passwords",
affected_component="password_policy",
remediation="Require special characters in passwords"
))
recommendations.append("Require special characters in passwords")
# Test session policy
session_config = self.security_config["session_policy"]
if session_config["timeout_minutes"] > 60:
vulnerabilities.append(SecurityVulnerability(
severity="medium",
category="authentication",
description="Session timeout is too long",
affected_component="session_policy",
remediation="Reduce session timeout to 30 minutes or less"
))
recommendations.append("Reduce session timeout to 30 minutes or less")
if session_config["max_failed_attempts"] > 5:
vulnerabilities.append(SecurityVulnerability(
severity="high",
category="authentication",
description="Too many failed login attempts allowed",
affected_component="session_policy",
remediation="Reduce maximum failed attempts to 5 or less"
))
recommendations.append("Reduce maximum failed attempts to 5 or less")
passed = len(vulnerabilities) == 0
return SecurityTestResult(
test_name="Authentication Security",
passed=passed,
details=f"Found {len(vulnerabilities)} authentication vulnerabilities",
vulnerabilities=vulnerabilities,
recommendations=recommendations
)
def test_input_validation_security(self) -> SecurityTestResult:
"""Test input validation security"""
vulnerabilities = []
recommendations = []
input_config = self.security_config["input_validation"]
# Test input length limits
if input_config["max_input_length"] > 1000:
vulnerabilities.append(SecurityVulnerability(
severity="medium",
category="input_validation",
description="Input length limit is too high",
affected_component="input_validation",
remediation="Reduce maximum input length to 1000 characters or less"
))
recommendations.append("Reduce maximum input length to 1000 characters or less")
# Test blocked patterns
blocked_patterns = input_config["blocked_patterns"]
required_patterns = ["<script>", "javascript:", "data:text/html"]
for pattern in required_patterns:
if pattern not in blocked_patterns:
vulnerabilities.append(SecurityVulnerability(
severity="high",
category="input_validation",
description=f"Missing blocked pattern: {pattern}",
affected_component="input_validation",
remediation=f"Add '{pattern}' to blocked patterns"
))
recommendations.append(f"Add '{pattern}' to blocked patterns")
passed = len(vulnerabilities) == 0
return SecurityTestResult(
test_name="Input Validation Security",
passed=passed,
details=f"Found {len(vulnerabilities)} input validation vulnerabilities",
vulnerabilities=vulnerabilities,
recommendations=recommendations
)
def test_data_protection_security(self) -> SecurityTestResult:
"""Test data protection security"""
vulnerabilities = []
recommendations = []
encryption_config = self.security_config["encryption"]
# Test hash algorithm
if encryption_config["hash_algorithm"] not in ["sha256", "sha512", "bcrypt"]:
vulnerabilities.append(SecurityVulnerability(
severity="high",
category="data_protection",
description="Weak hash algorithm in use",
affected_component="encryption",
remediation="Use SHA-256, SHA-512, or bcrypt for hashing"
))
recommendations.append("Use SHA-256, SHA-512, or bcrypt for hashing")
# Test salt length
if encryption_config["salt_length"] < 32:
vulnerabilities.append(SecurityVulnerability(
severity="medium",
category="data_protection",
description="Salt length is too short",
affected_component="encryption",
remediation="Increase salt length to 32 bytes or more"
))
recommendations.append("Increase salt length to 32 bytes or more")
# Test key derivation rounds
if encryption_config["key_derivation_rounds"] < 100000:
vulnerabilities.append(SecurityVulnerability(
severity="medium",
category="data_protection",
description="Key derivation rounds too low",
affected_component="encryption",
remediation="Increase key derivation rounds to 100,000 or more"
))
recommendations.append("Increase key derivation rounds to 100,000 or more")
passed = len(vulnerabilities) == 0
return SecurityTestResult(
test_name="Data Protection Security",
passed=passed,
details=f"Found {len(vulnerabilities)} data protection vulnerabilities",
vulnerabilities=vulnerabilities,
recommendations=recommendations
)
def test_file_permissions_security(self) -> SecurityTestResult:
"""Test file permissions security"""
vulnerabilities = []
recommendations = []
# Check critical files and directories
critical_paths = [
("user_management.py", 0o644),
("admin_interface_simple.py", 0o644),
("cli_integration.py", 0o644),
("composer_integration_simple.py", 0o644),
("unified_integration.py", 0o644),
("users.db", 0o600),
("admin.db", 0o600)
]
for file_path, expected_perms in critical_paths:
if os.path.exists(file_path):
try:
stat_info = os.stat(file_path)
current_perms = stat_info.st_mode & 0o777
if current_perms != expected_perms:
severity = "high" if "db" in file_path else "medium"
vulnerabilities.append(SecurityVulnerability(
severity=severity,
category="data_protection",
description=f"Insecure file permissions on {file_path}",
affected_component="file_permissions",
remediation=f"Set permissions on {file_path} to {oct(expected_perms)}"
))
recommendations.append(f"Set permissions on {file_path} to {oct(expected_perms)}")
except Exception as e:
vulnerabilities.append(SecurityVulnerability(
severity="medium",
category="data_protection",
description=f"Could not check permissions on {file_path}: {e}",
affected_component="file_permissions",
remediation="Verify file permissions manually"
))
passed = len(vulnerabilities) == 0
return SecurityTestResult(
test_name="File Permissions Security",
passed=passed,
details=f"Found {len(vulnerabilities)} file permission vulnerabilities",
vulnerabilities=vulnerabilities,
recommendations=recommendations
)
def test_sql_injection_protection(self) -> SecurityTestResult:
"""Test SQL injection protection"""
vulnerabilities = []
recommendations = []
# Check for potential SQL injection vulnerabilities in code
sql_patterns = [
r"execute\(.*\+.*\)", # String concatenation in SQL
r"execute\(.*%s.*\)", # %s formatting in SQL
r"execute\(.*\{.*\}.*\)", # f-string formatting in SQL
]
python_files = [
"user_management.py",
"admin_interface_simple.py",
"cli_integration.py",
"composer_integration_simple.py",
"unified_integration.py"
]
for file_path in python_files:
if os.path.exists(file_path):
try:
with open(file_path, 'r') as f:
content = f.read()
for pattern in sql_patterns:
matches = re.findall(pattern, content)
if matches:
vulnerabilities.append(SecurityVulnerability(
severity="high",
category="input_validation",
description=f"Potential SQL injection in {file_path}",
affected_component="database_operations",
remediation="Use parameterized queries instead of string formatting"
))
recommendations.append("Use parameterized queries instead of string formatting")
break
except Exception as e:
vulnerabilities.append(SecurityVulnerability(
severity="medium",
category="input_validation",
description=f"Could not analyze {file_path}: {e}",
affected_component="code_analysis",
remediation="Review file manually for SQL injection vulnerabilities"
))
passed = len(vulnerabilities) == 0
return SecurityTestResult(
test_name="SQL Injection Protection",
passed=passed,
details=f"Found {len(vulnerabilities)} potential SQL injection vulnerabilities",
vulnerabilities=vulnerabilities,
recommendations=recommendations
)
def test_cross_site_scripting_protection(self) -> SecurityTestResult:
"""Test XSS protection"""
vulnerabilities = []
recommendations = []
# Check for potential XSS vulnerabilities
xss_patterns = [
r"innerHTML\s*=",
r"document\.write\(",
r"eval\(",
r"innerHTML\s*\+=",
]
# Check HTML templates and JavaScript files
template_files = [
"templates/",
"static/js/",
"*.html",
"*.js"
]
# For now, check Python files for potential XSS in web output
python_files = [
"admin_interface_simple.py",
"composer_integration_simple.py"
]
for file_path in python_files:
if os.path.exists(file_path):
try:
with open(file_path, 'r') as f:
content = f.read()
# Look for potential XSS in web output
if "print(" in content or "return" in content:
# This is a simplified check - in real implementation,
# would need more sophisticated analysis
pass
except Exception as e:
vulnerabilities.append(SecurityVulnerability(
severity="medium",
category="input_validation",
description=f"Could not analyze {file_path}: {e}",
affected_component="code_analysis",
remediation="Review file manually for XSS vulnerabilities"
))
# Add general XSS recommendations
if not vulnerabilities:
recommendations.append("Implement Content Security Policy (CSP) headers")
recommendations.append("Use HTML escaping for all user input in web output")
recommendations.append("Validate and sanitize all user input")
passed = len(vulnerabilities) == 0
return SecurityTestResult(
test_name="Cross-Site Scripting Protection",
passed=passed,
details=f"Found {len(vulnerabilities)} potential XSS vulnerabilities",
vulnerabilities=vulnerabilities,
recommendations=recommendations
)
def run_comprehensive_security_audit(self) -> Dict[str, Any]:
"""Run comprehensive security audit"""
audit_results = {
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
"tests": [],
"summary": {
"total_tests": 0,
"passed_tests": 0,
"failed_tests": 0,
"total_vulnerabilities": 0,
"critical_vulnerabilities": 0,
"high_vulnerabilities": 0,
"medium_vulnerabilities": 0,
"low_vulnerabilities": 0
},
"recommendations": []
}
# Run all security tests
tests = [
self.test_authentication_security(),
self.test_input_validation_security(),
self.test_data_protection_security(),
self.test_file_permissions_security(),
self.test_sql_injection_protection(),
self.test_cross_site_scripting_protection()
]
for test in tests:
audit_results["tests"].append({
"name": test.test_name,
"passed": test.passed,
"details": test.details,
"vulnerabilities": [v.__dict__ for v in (test.vulnerabilities or [])],
"recommendations": test.recommendations or []
})
# Update summary
audit_results["summary"]["total_tests"] += 1
if test.passed:
audit_results["summary"]["passed_tests"] += 1
else:
audit_results["summary"]["failed_tests"] += 1
# Count vulnerabilities by severity
if test.vulnerabilities:
for vuln in test.vulnerabilities:
audit_results["summary"]["total_vulnerabilities"] += 1
if vuln.severity == "critical":
audit_results["summary"]["critical_vulnerabilities"] += 1
elif vuln.severity == "high":
audit_results["summary"]["high_vulnerabilities"] += 1
elif vuln.severity == "medium":
audit_results["summary"]["medium_vulnerabilities"] += 1
elif vuln.severity == "low":
audit_results["summary"]["low_vulnerabilities"] += 1
# Collect recommendations
if test.recommendations:
audit_results["recommendations"].extend(test.recommendations)
# Remove duplicate recommendations
audit_results["recommendations"] = list(set(audit_results["recommendations"]))
# Store results in database
self._store_audit_results(audit_results)
return audit_results
def _store_audit_results(self, audit_results: Dict[str, Any]):
"""Store audit results in database"""
try:
conn = sqlite3.connect(self.vulnerabilities_db)
cursor = conn.cursor()
# Store test results
for test in audit_results["tests"]:
cursor.execute("""
INSERT INTO security_tests (timestamp, test_name, passed, details, vulnerabilities_count)
VALUES (?, ?, ?, ?, ?)
""", (
audit_results["timestamp"],
test["name"],
test["passed"],
test["details"],
len(test["vulnerabilities"])
))
# Store vulnerabilities
for test in audit_results["tests"]:
for vuln in test["vulnerabilities"]:
cursor.execute("""
INSERT INTO vulnerabilities (timestamp, severity, category, description,
cve_id, affected_component, remediation)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (
audit_results["timestamp"],
vuln["severity"],
vuln["category"],
vuln["description"],
vuln.get("cve_id"),
vuln["affected_component"],
vuln["remediation"]
))
conn.commit()
conn.close()
except Exception as e:
print(f"Warning: Could not store audit results: {e}")
def get_security_report(self) -> Dict[str, Any]:
"""Generate comprehensive security report"""
# Run fresh audit
audit_results = self.run_comprehensive_security_audit()
# Generate risk assessment
risk_score = 0
if audit_results["summary"]["critical_vulnerabilities"] > 0:
risk_score = 100
elif audit_results["summary"]["high_vulnerabilities"] > 0:
risk_score = 75
elif audit_results["summary"]["medium_vulnerabilities"] > 0:
risk_score = 50
elif audit_results["summary"]["low_vulnerabilities"] > 0:
risk_score = 25
else:
risk_score = 0
risk_level = "Critical" if risk_score >= 80 else "High" if risk_score >= 60 else "Medium" if risk_score >= 40 else "Low" if risk_score >= 20 else "Secure"
report = {
"security_audit": audit_results,
"risk_assessment": {
"risk_score": risk_score,
"risk_level": risk_level,
"overall_status": "Secure" if risk_score == 0 else "Needs Attention"
},
"compliance": {
"owasp_top_10": self._check_owasp_compliance(audit_results),
"cis_benchmarks": self._check_cis_compliance(audit_results)
},
"remediation_priority": self._prioritize_remediations(audit_results)
}
return report
def _check_owasp_compliance(self, audit_results: Dict[str, Any]) -> Dict[str, Any]:
"""Check OWASP Top 10 compliance"""
owasp_checks = {
"A01:2021-Broken Access Control": audit_results["summary"]["high_vulnerabilities"] == 0,
"A02:2021-Cryptographic Failures": audit_results["summary"]["high_vulnerabilities"] == 0,
"A03:2021-Injection": audit_results["summary"]["high_vulnerabilities"] == 0,
"A04:2021-Insecure Design": audit_results["summary"]["medium_vulnerabilities"] == 0,
"A05:2021-Security Misconfiguration": audit_results["summary"]["medium_vulnerabilities"] == 0
}
return {
"compliant": all(owasp_checks.values()),
"checks": owasp_checks
}
def _check_cis_compliance(self, audit_results: Dict[str, Any]) -> Dict[str, Any]:
"""Check CIS benchmarks compliance"""
# Simplified CIS compliance check
cis_checks = {
"Password Policy": audit_results["summary"]["medium_vulnerabilities"] == 0,
"Session Management": audit_results["summary"]["medium_vulnerabilities"] == 0,
"Input Validation": audit_results["summary"]["high_vulnerabilities"] == 0,
"File Permissions": audit_results["summary"]["medium_vulnerabilities"] == 0
}
return {
"compliant": all(cis_checks.values()),
"checks": cis_checks
}
def _prioritize_remediations(self, audit_results: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Prioritize remediation actions"""
remediations = []
# Critical vulnerabilities first
for test in audit_results["tests"]:
for vuln in test["vulnerabilities"]:
if vuln["severity"] == "critical":
remediations.append({
"priority": 1,
"severity": vuln["severity"],
"description": vuln["description"],
"remediation": vuln["remediation"],
"affected_component": vuln["affected_component"]
})
# High vulnerabilities second
for test in audit_results["tests"]:
for vuln in test["vulnerabilities"]:
if vuln["severity"] == "high":
remediations.append({
"priority": 2,
"severity": vuln["severity"],
"description": vuln["description"],
"remediation": vuln["remediation"],
"affected_component": vuln["affected_component"]
})
# Medium vulnerabilities third
for test in audit_results["tests"]:
for vuln in test["vulnerabilities"]:
if vuln["severity"] == "medium":
remediations.append({
"priority": 3,
"severity": vuln["severity"],
"description": vuln["description"],
"remediation": vuln["remediation"],
"affected_component": vuln["affected_component"]
})
# Low vulnerabilities last
for test in audit_results["tests"]:
for vuln in test["vulnerabilities"]:
if vuln["severity"] == "low":
remediations.append({
"priority": 4,
"severity": vuln["severity"],
"description": vuln["description"],
"remediation": vuln["remediation"],
"affected_component": vuln["affected_component"]
})
return remediations