debian-koji/user_management.py
2025-08-26 11:48:31 -07:00

327 lines
10 KiB
Python

#!/usr/bin/env python3
"""
Debian Forge User Management System
This module provides user authentication, authorization, and management
for the Debian Forge composer system.
"""
import hashlib
import secrets
import sqlite3
import time
from typing import Dict, List, Optional, Any
from dataclasses import dataclass
from pathlib import Path
import json
@dataclass
class User:
"""Represents a user in the system"""
username: str
email: str
role: str
created_at: str
last_login: Optional[str] = None
is_active: bool = True
@dataclass
class UserCredentials:
"""User authentication credentials"""
username: str
password_hash: str
salt: str
created_at: str
class UserManager:
"""Manages user accounts and authentication"""
def __init__(self, db_path: str = "users.db"):
self.db_path = db_path
self._init_database()
def _init_database(self):
"""Initialize the user database"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Create users table
cursor.execute("""
CREATE TABLE IF NOT EXISTS users (
username TEXT PRIMARY KEY,
email TEXT UNIQUE NOT NULL,
role TEXT NOT NULL,
created_at TEXT NOT NULL,
last_login TEXT,
is_active BOOLEAN DEFAULT 1
)
""")
# Create credentials table
cursor.execute("""
CREATE TABLE IF NOT EXISTS user_credentials (
username TEXT PRIMARY KEY,
password_hash TEXT NOT NULL,
salt TEXT NOT NULL,
created_at TEXT NOT NULL,
FOREIGN KEY (username) REFERENCES users (username)
)
""")
# Create roles table
cursor.execute("""
CREATE TABLE IF NOT EXISTS roles (
name TEXT PRIMARY KEY,
permissions TEXT NOT NULL,
description TEXT
)
""")
# Insert default roles if they don't exist
default_roles = [
("admin", json.dumps(["*"]), "Full system access"),
("user", json.dumps(["read", "build"]), "Standard user access"),
("viewer", json.dumps(["read"]), "Read-only access")
]
for role, permissions, description in default_roles:
cursor.execute("""
INSERT OR IGNORE INTO roles (name, permissions, description)
VALUES (?, ?, ?)
""", (role, permissions, description))
conn.commit()
conn.close()
def _hash_password(self, password: str, salt: str) -> str:
"""Hash a password with salt using SHA-256"""
return hashlib.sha256((password + salt).encode()).hexdigest()
def _generate_salt(self) -> str:
"""Generate a random salt"""
return secrets.token_hex(16)
def create_user(self, username: str, email: str, password: str, role: str = "user") -> bool:
"""Create a new user account"""
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Check if user already exists
cursor.execute("SELECT username FROM users WHERE username = ?", (username,))
if cursor.fetchone():
return False
# Check if email already exists
cursor.execute("SELECT email FROM users WHERE email = ?", (email,))
if cursor.fetchone():
return False
# Create user record
created_at = time.strftime("%Y-%m-%d %H:%M:%S")
cursor.execute("""
INSERT INTO users (username, email, role, created_at)
VALUES (?, ?, ?, ?)
""", (username, email, role, created_at))
# Create credentials record
salt = self._generate_salt()
password_hash = self._hash_password(password, salt)
cursor.execute("""
INSERT INTO user_credentials (username, password_hash, salt, created_at)
VALUES (?, ?, ?, ?)
""", (username, password_hash, salt, created_at))
conn.commit()
conn.close()
return True
except Exception as e:
print(f"Error creating user: {e}")
return False
def authenticate_user(self, username: str, password: str) -> Optional[User]:
"""Authenticate a user and return user object if successful"""
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Get user credentials
cursor.execute("""
SELECT uc.password_hash, uc.salt, u.email, u.role, u.created_at, u.is_active
FROM user_credentials uc
JOIN users u ON uc.username = u.username
WHERE uc.username = ?
""", (username,))
result = cursor.fetchone()
if not result:
return None
password_hash, salt, email, role, created_at, is_active = result
# Check if user is active
if not is_active:
return None
# Verify password
if self._hash_password(password, salt) == password_hash:
# Update last login
last_login = time.strftime("%Y-%m-%d %H:%M:%S")
cursor.execute("""
UPDATE users SET last_login = ? WHERE username = ?
""", (last_login, username))
conn.commit()
conn.close()
return User(
username=username,
email=email,
role=role,
created_at=created_at,
last_login=last_login,
is_active=is_active
)
conn.close()
return None
except Exception as e:
print(f"Error authenticating user: {e}")
return None
def get_user(self, username: str) -> Optional[User]:
"""Get user information by username"""
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("""
SELECT username, email, role, created_at, last_login, is_active
FROM users WHERE username = ?
""", (username,))
result = cursor.fetchone()
conn.close()
if result:
username, email, role, created_at, last_login, is_active = result
return User(
username=username,
email=email,
role=role,
created_at=created_at,
last_login=last_login,
is_active=is_active
)
return None
except Exception as e:
print(f"Error getting user: {e}")
return None
def list_users(self) -> List[User]:
"""List all users in the system"""
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("""
SELECT username, email, role, created_at, last_login, is_active
FROM users ORDER BY username
""")
users = []
for row in cursor.fetchall():
username, email, role, created_at, last_login, is_active = row
users.append(User(
username=username,
email=email,
role=role,
created_at=created_at,
last_login=last_login,
is_active=is_active
))
conn.close()
return users
except Exception as e:
print(f"Error listing users: {e}")
return []
def update_user_role(self, username: str, new_role: str) -> bool:
"""Update a user's role"""
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("""
UPDATE users SET role = ? WHERE username = ?
""", (new_role, username))
if cursor.rowcount > 0:
conn.commit()
conn.close()
return True
conn.close()
return False
except Exception as e:
print(f"Error updating user role: {e}")
return False
def deactivate_user(self, username: str) -> bool:
"""Deactivate a user account"""
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("""
UPDATE users SET is_active = 0 WHERE username = ?
""", (username,))
if cursor.rowcount > 0:
conn.commit()
conn.close()
return True
conn.close()
return False
except Exception as e:
print(f"Error deactivating user: {e}")
return False
def get_user_permissions(self, username: str) -> List[str]:
"""Get permissions for a user based on their role"""
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("""
SELECT r.permissions FROM roles r
JOIN users u ON r.name = u.role
WHERE u.username = ?
""", (username,))
result = cursor.fetchone()
conn.close()
if result:
permissions = json.loads(result[0])
return permissions
return []
except Exception as e:
print(f"Error getting user permissions: {e}")
return []
def check_permission(self, username: str, permission: str) -> bool:
"""Check if a user has a specific permission"""
permissions = self.get_user_permissions(username)
return "*" in permissions or permission in permissions