did stuff

This commit is contained in:
robojerk 2025-08-26 11:48:31 -07:00
parent fb3fd87966
commit 78a2c5a2a7
4 changed files with 1103 additions and 0 deletions

55
Containerfile Normal file
View file

@ -0,0 +1,55 @@
# Debian Koji Container
# Koji build system environment with database connections and caching
FROM debian:trixie-slim
# Install system dependencies for koji
RUN apt-get update && apt-get install -y \
python3 \
python3-pip \
python3-setuptools \
python3-wheel \
python3-dev \
python3-psycopg2 \
python3-ldap \
python3-kerberos \
python3-gssapi \
ca-certificates \
curl \
postgresql-client \
&& rm -rf /var/lib/apt/lists/*
# Install koji from the local source
COPY . /tmp/koji
RUN cd /tmp/koji && \
python3 -m pip install --no-cache-dir -e . && \
rm -rf /tmp/koji
# Create non-root user for security
RUN useradd -r -s /bin/bash -u 1000 koji
# Set up koji directories
RUN mkdir -p /var/lib/koji /var/log/koji /etc/koji && \
chown -R koji:koji /var/lib/koji /var/log/koji /etc/koji
# Set working directory
WORKDIR /var/lib/koji
# Switch to non-root user
USER koji
# Expose koji hub port
EXPOSE 80
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD python3 -c "import koji; print('Koji available')" || exit 1
# Default command - koji hub
CMD ["python3", "-m", "koji", "hub", "--config", "/etc/koji/koji.conf"]
# Labels for container management
LABEL org.opencontainers.image.title="Debian Koji"
LABEL org.opencontainers.image.description="Debian Koji - Build system coordination"
LABEL org.opencontainers.image.vendor="Debian Forge Team"
LABEL org.opencontainers.image.source="https://git.raines.xyz/particle-os/debian-koji"

681
security_hardening.py Normal file
View file

@ -0,0 +1,681 @@
#!/usr/bin/env python3
"""
Debian Forge Security Hardening Module
This module provides security testing, hardening, and monitoring capabilities
for the Debian Forge system.
"""
import hashlib
import json
import os
import re
import sqlite3
import time
from typing import Dict, List, Optional, Any, Tuple
from dataclasses import dataclass
from pathlib import Path
import secrets
@dataclass
class SecurityVulnerability:
"""Security vulnerability information"""
severity: str # "critical", "high", "medium", "low"
category: str # "authentication", "authorization", "input_validation", "data_protection"
description: str
cve_id: Optional[str] = None
affected_component: str = "unknown"
remediation: str = ""
@dataclass
class SecurityTestResult:
"""Result of a security test"""
test_name: str
passed: bool
details: str
vulnerabilities: List[SecurityVulnerability] = None
recommendations: List[str] = None
class SecurityHardening:
"""Security hardening and testing for Debian Forge"""
def __init__(self, config_file: str = "security_config.json"):
self.config_file = config_file
self.security_config = self._load_security_config()
self.vulnerabilities_db = "security_vulnerabilities.db"
self._init_vulnerability_db()
def _load_security_config(self) -> Dict[str, Any]:
"""Load security configuration"""
default_config = {
"password_policy": {
"min_length": 12,
"require_uppercase": True,
"require_lowercase": True,
"require_numbers": True,
"require_special": True,
"max_age_days": 90
},
"session_policy": {
"timeout_minutes": 30,
"max_failed_attempts": 5,
"lockout_duration_minutes": 15
},
"input_validation": {
"max_input_length": 1000,
"allowed_file_types": [".json", ".yaml", ".yml", ".txt"],
"blocked_patterns": ["<script>", "javascript:", "data:text/html"]
},
"encryption": {
"hash_algorithm": "sha256",
"salt_length": 32,
"key_derivation_rounds": 100000
}
}
if os.path.exists(self.config_file):
try:
with open(self.config_file, 'r') as f:
user_config = json.load(f)
# Merge user config with defaults
for key, value in user_config.items():
if key in default_config:
default_config[key].update(value)
else:
default_config[key] = value
except Exception as e:
print(f"Warning: Could not load security config: {e}")
return default_config
def _init_vulnerability_db(self):
"""Initialize vulnerability database"""
conn = sqlite3.connect(self.vulnerabilities_db)
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS vulnerabilities (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
severity TEXT NOT NULL,
category TEXT NOT NULL,
description TEXT NOT NULL,
cve_id TEXT,
affected_component TEXT NOT NULL,
remediation TEXT,
status TEXT DEFAULT 'open'
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS security_tests (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
test_name TEXT NOT NULL,
passed BOOLEAN NOT NULL,
details TEXT,
vulnerabilities_count INTEGER DEFAULT 0
)
""")
conn.commit()
conn.close()
def test_authentication_security(self) -> SecurityTestResult:
"""Test authentication security"""
vulnerabilities = []
recommendations = []
# Test password policy
password_config = self.security_config["password_policy"]
if password_config["min_length"] < 12:
vulnerabilities.append(SecurityVulnerability(
severity="medium",
category="authentication",
description="Password minimum length is less than 12 characters",
affected_component="password_policy",
remediation="Increase minimum password length to 12 characters"
))
recommendations.append("Increase minimum password length to 12 characters")
if not password_config["require_special"]:
vulnerabilities.append(SecurityVulnerability(
severity="medium",
category="authentication",
description="Special characters not required in passwords",
affected_component="password_policy",
remediation="Require special characters in passwords"
))
recommendations.append("Require special characters in passwords")
# Test session policy
session_config = self.security_config["session_policy"]
if session_config["timeout_minutes"] > 60:
vulnerabilities.append(SecurityVulnerability(
severity="medium",
category="authentication",
description="Session timeout is too long",
affected_component="session_policy",
remediation="Reduce session timeout to 30 minutes or less"
))
recommendations.append("Reduce session timeout to 30 minutes or less")
if session_config["max_failed_attempts"] > 5:
vulnerabilities.append(SecurityVulnerability(
severity="high",
category="authentication",
description="Too many failed login attempts allowed",
affected_component="session_policy",
remediation="Reduce maximum failed attempts to 5 or less"
))
recommendations.append("Reduce maximum failed attempts to 5 or less")
passed = len(vulnerabilities) == 0
return SecurityTestResult(
test_name="Authentication Security",
passed=passed,
details=f"Found {len(vulnerabilities)} authentication vulnerabilities",
vulnerabilities=vulnerabilities,
recommendations=recommendations
)
def test_input_validation_security(self) -> SecurityTestResult:
"""Test input validation security"""
vulnerabilities = []
recommendations = []
input_config = self.security_config["input_validation"]
# Test input length limits
if input_config["max_input_length"] > 1000:
vulnerabilities.append(SecurityVulnerability(
severity="medium",
category="input_validation",
description="Input length limit is too high",
affected_component="input_validation",
remediation="Reduce maximum input length to 1000 characters or less"
))
recommendations.append("Reduce maximum input length to 1000 characters or less")
# Test blocked patterns
blocked_patterns = input_config["blocked_patterns"]
required_patterns = ["<script>", "javascript:", "data:text/html"]
for pattern in required_patterns:
if pattern not in blocked_patterns:
vulnerabilities.append(SecurityVulnerability(
severity="high",
category="input_validation",
description=f"Missing blocked pattern: {pattern}",
affected_component="input_validation",
remediation=f"Add '{pattern}' to blocked patterns"
))
recommendations.append(f"Add '{pattern}' to blocked patterns")
passed = len(vulnerabilities) == 0
return SecurityTestResult(
test_name="Input Validation Security",
passed=passed,
details=f"Found {len(vulnerabilities)} input validation vulnerabilities",
vulnerabilities=vulnerabilities,
recommendations=recommendations
)
def test_data_protection_security(self) -> SecurityTestResult:
"""Test data protection security"""
vulnerabilities = []
recommendations = []
encryption_config = self.security_config["encryption"]
# Test hash algorithm
if encryption_config["hash_algorithm"] not in ["sha256", "sha512", "bcrypt"]:
vulnerabilities.append(SecurityVulnerability(
severity="high",
category="data_protection",
description="Weak hash algorithm in use",
affected_component="encryption",
remediation="Use SHA-256, SHA-512, or bcrypt for hashing"
))
recommendations.append("Use SHA-256, SHA-512, or bcrypt for hashing")
# Test salt length
if encryption_config["salt_length"] < 32:
vulnerabilities.append(SecurityVulnerability(
severity="medium",
category="data_protection",
description="Salt length is too short",
affected_component="encryption",
remediation="Increase salt length to 32 bytes or more"
))
recommendations.append("Increase salt length to 32 bytes or more")
# Test key derivation rounds
if encryption_config["key_derivation_rounds"] < 100000:
vulnerabilities.append(SecurityVulnerability(
severity="medium",
category="data_protection",
description="Key derivation rounds too low",
affected_component="encryption",
remediation="Increase key derivation rounds to 100,000 or more"
))
recommendations.append("Increase key derivation rounds to 100,000 or more")
passed = len(vulnerabilities) == 0
return SecurityTestResult(
test_name="Data Protection Security",
passed=passed,
details=f"Found {len(vulnerabilities)} data protection vulnerabilities",
vulnerabilities=vulnerabilities,
recommendations=recommendations
)
def test_file_permissions_security(self) -> SecurityTestResult:
"""Test file permissions security"""
vulnerabilities = []
recommendations = []
# Check critical files and directories
critical_paths = [
("user_management.py", 0o644),
("admin_interface_simple.py", 0o644),
("cli_integration.py", 0o644),
("composer_integration_simple.py", 0o644),
("unified_integration.py", 0o644),
("users.db", 0o600),
("admin.db", 0o600)
]
for file_path, expected_perms in critical_paths:
if os.path.exists(file_path):
try:
stat_info = os.stat(file_path)
current_perms = stat_info.st_mode & 0o777
if current_perms != expected_perms:
severity = "high" if "db" in file_path else "medium"
vulnerabilities.append(SecurityVulnerability(
severity=severity,
category="data_protection",
description=f"Insecure file permissions on {file_path}",
affected_component="file_permissions",
remediation=f"Set permissions on {file_path} to {oct(expected_perms)}"
))
recommendations.append(f"Set permissions on {file_path} to {oct(expected_perms)}")
except Exception as e:
vulnerabilities.append(SecurityVulnerability(
severity="medium",
category="data_protection",
description=f"Could not check permissions on {file_path}: {e}",
affected_component="file_permissions",
remediation="Verify file permissions manually"
))
passed = len(vulnerabilities) == 0
return SecurityTestResult(
test_name="File Permissions Security",
passed=passed,
details=f"Found {len(vulnerabilities)} file permission vulnerabilities",
vulnerabilities=vulnerabilities,
recommendations=recommendations
)
def test_sql_injection_protection(self) -> SecurityTestResult:
"""Test SQL injection protection"""
vulnerabilities = []
recommendations = []
# Check for potential SQL injection vulnerabilities in code
sql_patterns = [
r"execute\(.*\+.*\)", # String concatenation in SQL
r"execute\(.*%s.*\)", # %s formatting in SQL
r"execute\(.*\{.*\}.*\)", # f-string formatting in SQL
]
python_files = [
"user_management.py",
"admin_interface_simple.py",
"cli_integration.py",
"composer_integration_simple.py",
"unified_integration.py"
]
for file_path in python_files:
if os.path.exists(file_path):
try:
with open(file_path, 'r') as f:
content = f.read()
for pattern in sql_patterns:
matches = re.findall(pattern, content)
if matches:
vulnerabilities.append(SecurityVulnerability(
severity="high",
category="input_validation",
description=f"Potential SQL injection in {file_path}",
affected_component="database_operations",
remediation="Use parameterized queries instead of string formatting"
))
recommendations.append("Use parameterized queries instead of string formatting")
break
except Exception as e:
vulnerabilities.append(SecurityVulnerability(
severity="medium",
category="input_validation",
description=f"Could not analyze {file_path}: {e}",
affected_component="code_analysis",
remediation="Review file manually for SQL injection vulnerabilities"
))
passed = len(vulnerabilities) == 0
return SecurityTestResult(
test_name="SQL Injection Protection",
passed=passed,
details=f"Found {len(vulnerabilities)} potential SQL injection vulnerabilities",
vulnerabilities=vulnerabilities,
recommendations=recommendations
)
def test_cross_site_scripting_protection(self) -> SecurityTestResult:
"""Test XSS protection"""
vulnerabilities = []
recommendations = []
# Check for potential XSS vulnerabilities
xss_patterns = [
r"innerHTML\s*=",
r"document\.write\(",
r"eval\(",
r"innerHTML\s*\+=",
]
# Check HTML templates and JavaScript files
template_files = [
"templates/",
"static/js/",
"*.html",
"*.js"
]
# For now, check Python files for potential XSS in web output
python_files = [
"admin_interface_simple.py",
"composer_integration_simple.py"
]
for file_path in python_files:
if os.path.exists(file_path):
try:
with open(file_path, 'r') as f:
content = f.read()
# Look for potential XSS in web output
if "print(" in content or "return" in content:
# This is a simplified check - in real implementation,
# would need more sophisticated analysis
pass
except Exception as e:
vulnerabilities.append(SecurityVulnerability(
severity="medium",
category="input_validation",
description=f"Could not analyze {file_path}: {e}",
affected_component="code_analysis",
remediation="Review file manually for XSS vulnerabilities"
))
# Add general XSS recommendations
if not vulnerabilities:
recommendations.append("Implement Content Security Policy (CSP) headers")
recommendations.append("Use HTML escaping for all user input in web output")
recommendations.append("Validate and sanitize all user input")
passed = len(vulnerabilities) == 0
return SecurityTestResult(
test_name="Cross-Site Scripting Protection",
passed=passed,
details=f"Found {len(vulnerabilities)} potential XSS vulnerabilities",
vulnerabilities=vulnerabilities,
recommendations=recommendations
)
def run_comprehensive_security_audit(self) -> Dict[str, Any]:
"""Run comprehensive security audit"""
audit_results = {
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
"tests": [],
"summary": {
"total_tests": 0,
"passed_tests": 0,
"failed_tests": 0,
"total_vulnerabilities": 0,
"critical_vulnerabilities": 0,
"high_vulnerabilities": 0,
"medium_vulnerabilities": 0,
"low_vulnerabilities": 0
},
"recommendations": []
}
# Run all security tests
tests = [
self.test_authentication_security(),
self.test_input_validation_security(),
self.test_data_protection_security(),
self.test_file_permissions_security(),
self.test_sql_injection_protection(),
self.test_cross_site_scripting_protection()
]
for test in tests:
audit_results["tests"].append({
"name": test.test_name,
"passed": test.passed,
"details": test.details,
"vulnerabilities": [v.__dict__ for v in (test.vulnerabilities or [])],
"recommendations": test.recommendations or []
})
# Update summary
audit_results["summary"]["total_tests"] += 1
if test.passed:
audit_results["summary"]["passed_tests"] += 1
else:
audit_results["summary"]["failed_tests"] += 1
# Count vulnerabilities by severity
if test.vulnerabilities:
for vuln in test.vulnerabilities:
audit_results["summary"]["total_vulnerabilities"] += 1
if vuln.severity == "critical":
audit_results["summary"]["critical_vulnerabilities"] += 1
elif vuln.severity == "high":
audit_results["summary"]["high_vulnerabilities"] += 1
elif vuln.severity == "medium":
audit_results["summary"]["medium_vulnerabilities"] += 1
elif vuln.severity == "low":
audit_results["summary"]["low_vulnerabilities"] += 1
# Collect recommendations
if test.recommendations:
audit_results["recommendations"].extend(test.recommendations)
# Remove duplicate recommendations
audit_results["recommendations"] = list(set(audit_results["recommendations"]))
# Store results in database
self._store_audit_results(audit_results)
return audit_results
def _store_audit_results(self, audit_results: Dict[str, Any]):
"""Store audit results in database"""
try:
conn = sqlite3.connect(self.vulnerabilities_db)
cursor = conn.cursor()
# Store test results
for test in audit_results["tests"]:
cursor.execute("""
INSERT INTO security_tests (timestamp, test_name, passed, details, vulnerabilities_count)
VALUES (?, ?, ?, ?, ?)
""", (
audit_results["timestamp"],
test["name"],
test["passed"],
test["details"],
len(test["vulnerabilities"])
))
# Store vulnerabilities
for test in audit_results["tests"]:
for vuln in test["vulnerabilities"]:
cursor.execute("""
INSERT INTO vulnerabilities (timestamp, severity, category, description,
cve_id, affected_component, remediation)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (
audit_results["timestamp"],
vuln["severity"],
vuln["category"],
vuln["description"],
vuln.get("cve_id"),
vuln["affected_component"],
vuln["remediation"]
))
conn.commit()
conn.close()
except Exception as e:
print(f"Warning: Could not store audit results: {e}")
def get_security_report(self) -> Dict[str, Any]:
"""Generate comprehensive security report"""
# Run fresh audit
audit_results = self.run_comprehensive_security_audit()
# Generate risk assessment
risk_score = 0
if audit_results["summary"]["critical_vulnerabilities"] > 0:
risk_score = 100
elif audit_results["summary"]["high_vulnerabilities"] > 0:
risk_score = 75
elif audit_results["summary"]["medium_vulnerabilities"] > 0:
risk_score = 50
elif audit_results["summary"]["low_vulnerabilities"] > 0:
risk_score = 25
else:
risk_score = 0
risk_level = "Critical" if risk_score >= 80 else "High" if risk_score >= 60 else "Medium" if risk_score >= 40 else "Low" if risk_score >= 20 else "Secure"
report = {
"security_audit": audit_results,
"risk_assessment": {
"risk_score": risk_score,
"risk_level": risk_level,
"overall_status": "Secure" if risk_score == 0 else "Needs Attention"
},
"compliance": {
"owasp_top_10": self._check_owasp_compliance(audit_results),
"cis_benchmarks": self._check_cis_compliance(audit_results)
},
"remediation_priority": self._prioritize_remediations(audit_results)
}
return report
def _check_owasp_compliance(self, audit_results: Dict[str, Any]) -> Dict[str, Any]:
"""Check OWASP Top 10 compliance"""
owasp_checks = {
"A01:2021-Broken Access Control": audit_results["summary"]["high_vulnerabilities"] == 0,
"A02:2021-Cryptographic Failures": audit_results["summary"]["high_vulnerabilities"] == 0,
"A03:2021-Injection": audit_results["summary"]["high_vulnerabilities"] == 0,
"A04:2021-Insecure Design": audit_results["summary"]["medium_vulnerabilities"] == 0,
"A05:2021-Security Misconfiguration": audit_results["summary"]["medium_vulnerabilities"] == 0
}
return {
"compliant": all(owasp_checks.values()),
"checks": owasp_checks
}
def _check_cis_compliance(self, audit_results: Dict[str, Any]) -> Dict[str, Any]:
"""Check CIS benchmarks compliance"""
# Simplified CIS compliance check
cis_checks = {
"Password Policy": audit_results["summary"]["medium_vulnerabilities"] == 0,
"Session Management": audit_results["summary"]["medium_vulnerabilities"] == 0,
"Input Validation": audit_results["summary"]["high_vulnerabilities"] == 0,
"File Permissions": audit_results["summary"]["medium_vulnerabilities"] == 0
}
return {
"compliant": all(cis_checks.values()),
"checks": cis_checks
}
def _prioritize_remediations(self, audit_results: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Prioritize remediation actions"""
remediations = []
# Critical vulnerabilities first
for test in audit_results["tests"]:
for vuln in test["vulnerabilities"]:
if vuln["severity"] == "critical":
remediations.append({
"priority": 1,
"severity": vuln["severity"],
"description": vuln["description"],
"remediation": vuln["remediation"],
"affected_component": vuln["affected_component"]
})
# High vulnerabilities second
for test in audit_results["tests"]:
for vuln in test["vulnerabilities"]:
if vuln["severity"] == "high":
remediations.append({
"priority": 2,
"severity": vuln["severity"],
"description": vuln["description"],
"remediation": vuln["remediation"],
"affected_component": vuln["affected_component"]
})
# Medium vulnerabilities third
for test in audit_results["tests"]:
for vuln in test["vulnerabilities"]:
if vuln["severity"] == "medium":
remediations.append({
"priority": 3,
"severity": vuln["severity"],
"description": vuln["description"],
"remediation": vuln["remediation"],
"affected_component": vuln["affected_component"]
})
# Low vulnerabilities last
for test in audit_results["tests"]:
for vuln in test["vulnerabilities"]:
if vuln["severity"] == "low":
remediations.append({
"priority": 4,
"severity": vuln["severity"],
"description": vuln["description"],
"remediation": vuln["remediation"],
"affected_component": vuln["affected_component"]
})
return remediations

40
test_koji.py Normal file
View file

@ -0,0 +1,40 @@
#!/usr/bin/env python3
"""
Test script for koji module functionality
"""
import sys
import os
# Add the current directory to Python path
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
def test_imports():
"""Test importing koji modules"""
try:
print("Testing koji module imports...")
# Test importing the main module
import koji
print("✓ koji module imported successfully")
# Test importing submodules
import koji.util
print("✓ koji.util imported successfully")
import koji.tasks
print("✓ koji.tasks imported successfully")
print("All imports successful!")
return True
except ImportError as e:
print(f"✗ Import failed: {e}")
return False
except Exception as e:
print(f"✗ Unexpected error: {e}")
return False
if __name__ == "__main__":
success = test_imports()
sys.exit(0 if success else 1)

327
user_management.py Normal file
View file

@ -0,0 +1,327 @@
#!/usr/bin/env python3
"""
Debian Forge User Management System
This module provides user authentication, authorization, and management
for the Debian Forge composer system.
"""
import hashlib
import secrets
import sqlite3
import time
from typing import Dict, List, Optional, Any
from dataclasses import dataclass
from pathlib import Path
import json
@dataclass
class User:
"""Represents a user in the system"""
username: str
email: str
role: str
created_at: str
last_login: Optional[str] = None
is_active: bool = True
@dataclass
class UserCredentials:
"""User authentication credentials"""
username: str
password_hash: str
salt: str
created_at: str
class UserManager:
"""Manages user accounts and authentication"""
def __init__(self, db_path: str = "users.db"):
self.db_path = db_path
self._init_database()
def _init_database(self):
"""Initialize the user database"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Create users table
cursor.execute("""
CREATE TABLE IF NOT EXISTS users (
username TEXT PRIMARY KEY,
email TEXT UNIQUE NOT NULL,
role TEXT NOT NULL,
created_at TEXT NOT NULL,
last_login TEXT,
is_active BOOLEAN DEFAULT 1
)
""")
# Create credentials table
cursor.execute("""
CREATE TABLE IF NOT EXISTS user_credentials (
username TEXT PRIMARY KEY,
password_hash TEXT NOT NULL,
salt TEXT NOT NULL,
created_at TEXT NOT NULL,
FOREIGN KEY (username) REFERENCES users (username)
)
""")
# Create roles table
cursor.execute("""
CREATE TABLE IF NOT EXISTS roles (
name TEXT PRIMARY KEY,
permissions TEXT NOT NULL,
description TEXT
)
""")
# Insert default roles if they don't exist
default_roles = [
("admin", json.dumps(["*"]), "Full system access"),
("user", json.dumps(["read", "build"]), "Standard user access"),
("viewer", json.dumps(["read"]), "Read-only access")
]
for role, permissions, description in default_roles:
cursor.execute("""
INSERT OR IGNORE INTO roles (name, permissions, description)
VALUES (?, ?, ?)
""", (role, permissions, description))
conn.commit()
conn.close()
def _hash_password(self, password: str, salt: str) -> str:
"""Hash a password with salt using SHA-256"""
return hashlib.sha256((password + salt).encode()).hexdigest()
def _generate_salt(self) -> str:
"""Generate a random salt"""
return secrets.token_hex(16)
def create_user(self, username: str, email: str, password: str, role: str = "user") -> bool:
"""Create a new user account"""
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Check if user already exists
cursor.execute("SELECT username FROM users WHERE username = ?", (username,))
if cursor.fetchone():
return False
# Check if email already exists
cursor.execute("SELECT email FROM users WHERE email = ?", (email,))
if cursor.fetchone():
return False
# Create user record
created_at = time.strftime("%Y-%m-%d %H:%M:%S")
cursor.execute("""
INSERT INTO users (username, email, role, created_at)
VALUES (?, ?, ?, ?)
""", (username, email, role, created_at))
# Create credentials record
salt = self._generate_salt()
password_hash = self._hash_password(password, salt)
cursor.execute("""
INSERT INTO user_credentials (username, password_hash, salt, created_at)
VALUES (?, ?, ?, ?)
""", (username, password_hash, salt, created_at))
conn.commit()
conn.close()
return True
except Exception as e:
print(f"Error creating user: {e}")
return False
def authenticate_user(self, username: str, password: str) -> Optional[User]:
"""Authenticate a user and return user object if successful"""
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Get user credentials
cursor.execute("""
SELECT uc.password_hash, uc.salt, u.email, u.role, u.created_at, u.is_active
FROM user_credentials uc
JOIN users u ON uc.username = u.username
WHERE uc.username = ?
""", (username,))
result = cursor.fetchone()
if not result:
return None
password_hash, salt, email, role, created_at, is_active = result
# Check if user is active
if not is_active:
return None
# Verify password
if self._hash_password(password, salt) == password_hash:
# Update last login
last_login = time.strftime("%Y-%m-%d %H:%M:%S")
cursor.execute("""
UPDATE users SET last_login = ? WHERE username = ?
""", (last_login, username))
conn.commit()
conn.close()
return User(
username=username,
email=email,
role=role,
created_at=created_at,
last_login=last_login,
is_active=is_active
)
conn.close()
return None
except Exception as e:
print(f"Error authenticating user: {e}")
return None
def get_user(self, username: str) -> Optional[User]:
"""Get user information by username"""
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("""
SELECT username, email, role, created_at, last_login, is_active
FROM users WHERE username = ?
""", (username,))
result = cursor.fetchone()
conn.close()
if result:
username, email, role, created_at, last_login, is_active = result
return User(
username=username,
email=email,
role=role,
created_at=created_at,
last_login=last_login,
is_active=is_active
)
return None
except Exception as e:
print(f"Error getting user: {e}")
return None
def list_users(self) -> List[User]:
"""List all users in the system"""
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("""
SELECT username, email, role, created_at, last_login, is_active
FROM users ORDER BY username
""")
users = []
for row in cursor.fetchall():
username, email, role, created_at, last_login, is_active = row
users.append(User(
username=username,
email=email,
role=role,
created_at=created_at,
last_login=last_login,
is_active=is_active
))
conn.close()
return users
except Exception as e:
print(f"Error listing users: {e}")
return []
def update_user_role(self, username: str, new_role: str) -> bool:
"""Update a user's role"""
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("""
UPDATE users SET role = ? WHERE username = ?
""", (new_role, username))
if cursor.rowcount > 0:
conn.commit()
conn.close()
return True
conn.close()
return False
except Exception as e:
print(f"Error updating user role: {e}")
return False
def deactivate_user(self, username: str) -> bool:
"""Deactivate a user account"""
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("""
UPDATE users SET is_active = 0 WHERE username = ?
""", (username,))
if cursor.rowcount > 0:
conn.commit()
conn.close()
return True
conn.close()
return False
except Exception as e:
print(f"Error deactivating user: {e}")
return False
def get_user_permissions(self, username: str) -> List[str]:
"""Get permissions for a user based on their role"""
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("""
SELECT r.permissions FROM roles r
JOIN users u ON r.name = u.role
WHERE u.username = ?
""", (username,))
result = cursor.fetchone()
conn.close()
if result:
permissions = json.loads(result[0])
return permissions
return []
except Exception as e:
print(f"Error getting user permissions: {e}")
return []
def check_permission(self, username: str, permission: str) -> bool:
"""Check if a user has a specific permission"""
permissions = self.get_user_permissions(username)
return "*" in permissions or permission in permissions