#!/usr/bin/env python3 """ Debian Forge User Management System This module provides user authentication, authorization, and management for the Debian Forge composer system. """ import hashlib import secrets import sqlite3 import time from typing import Dict, List, Optional, Any from dataclasses import dataclass from pathlib import Path import json @dataclass class User: """Represents a user in the system""" username: str email: str role: str created_at: str last_login: Optional[str] = None is_active: bool = True @dataclass class UserCredentials: """User authentication credentials""" username: str password_hash: str salt: str created_at: str class UserManager: """Manages user accounts and authentication""" def __init__(self, db_path: str = "users.db"): self.db_path = db_path self._init_database() def _init_database(self): """Initialize the user database""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # Create users table cursor.execute(""" CREATE TABLE IF NOT EXISTS users ( username TEXT PRIMARY KEY, email TEXT UNIQUE NOT NULL, role TEXT NOT NULL, created_at TEXT NOT NULL, last_login TEXT, is_active BOOLEAN DEFAULT 1 ) """) # Create credentials table cursor.execute(""" CREATE TABLE IF NOT EXISTS user_credentials ( username TEXT PRIMARY KEY, password_hash TEXT NOT NULL, salt TEXT NOT NULL, created_at TEXT NOT NULL, FOREIGN KEY (username) REFERENCES users (username) ) """) # Create roles table cursor.execute(""" CREATE TABLE IF NOT EXISTS roles ( name TEXT PRIMARY KEY, permissions TEXT NOT NULL, description TEXT ) """) # Insert default roles if they don't exist default_roles = [ ("admin", json.dumps(["*"]), "Full system access"), ("user", json.dumps(["read", "build"]), "Standard user access"), ("viewer", json.dumps(["read"]), "Read-only access") ] for role, permissions, description in default_roles: cursor.execute(""" INSERT OR IGNORE INTO roles (name, permissions, description) VALUES (?, ?, ?) """, (role, permissions, description)) conn.commit() conn.close() def _hash_password(self, password: str, salt: str) -> str: """Hash a password with salt using SHA-256""" return hashlib.sha256((password + salt).encode()).hexdigest() def _generate_salt(self) -> str: """Generate a random salt""" return secrets.token_hex(16) def create_user(self, username: str, email: str, password: str, role: str = "user") -> bool: """Create a new user account""" try: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # Check if user already exists cursor.execute("SELECT username FROM users WHERE username = ?", (username,)) if cursor.fetchone(): return False # Check if email already exists cursor.execute("SELECT email FROM users WHERE email = ?", (email,)) if cursor.fetchone(): return False # Create user record created_at = time.strftime("%Y-%m-%d %H:%M:%S") cursor.execute(""" INSERT INTO users (username, email, role, created_at) VALUES (?, ?, ?, ?) """, (username, email, role, created_at)) # Create credentials record salt = self._generate_salt() password_hash = self._hash_password(password, salt) cursor.execute(""" INSERT INTO user_credentials (username, password_hash, salt, created_at) VALUES (?, ?, ?, ?) """, (username, password_hash, salt, created_at)) conn.commit() conn.close() return True except Exception as e: print(f"Error creating user: {e}") return False def authenticate_user(self, username: str, password: str) -> Optional[User]: """Authenticate a user and return user object if successful""" try: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # Get user credentials cursor.execute(""" SELECT uc.password_hash, uc.salt, u.email, u.role, u.created_at, u.is_active FROM user_credentials uc JOIN users u ON uc.username = u.username WHERE uc.username = ? """, (username,)) result = cursor.fetchone() if not result: return None password_hash, salt, email, role, created_at, is_active = result # Check if user is active if not is_active: return None # Verify password if self._hash_password(password, salt) == password_hash: # Update last login last_login = time.strftime("%Y-%m-%d %H:%M:%S") cursor.execute(""" UPDATE users SET last_login = ? WHERE username = ? """, (last_login, username)) conn.commit() conn.close() return User( username=username, email=email, role=role, created_at=created_at, last_login=last_login, is_active=is_active ) conn.close() return None except Exception as e: print(f"Error authenticating user: {e}") return None def get_user(self, username: str) -> Optional[User]: """Get user information by username""" try: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(""" SELECT username, email, role, created_at, last_login, is_active FROM users WHERE username = ? """, (username,)) result = cursor.fetchone() conn.close() if result: username, email, role, created_at, last_login, is_active = result return User( username=username, email=email, role=role, created_at=created_at, last_login=last_login, is_active=is_active ) return None except Exception as e: print(f"Error getting user: {e}") return None def list_users(self) -> List[User]: """List all users in the system""" try: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(""" SELECT username, email, role, created_at, last_login, is_active FROM users ORDER BY username """) users = [] for row in cursor.fetchall(): username, email, role, created_at, last_login, is_active = row users.append(User( username=username, email=email, role=role, created_at=created_at, last_login=last_login, is_active=is_active )) conn.close() return users except Exception as e: print(f"Error listing users: {e}") return [] def update_user_role(self, username: str, new_role: str) -> bool: """Update a user's role""" try: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(""" UPDATE users SET role = ? WHERE username = ? """, (new_role, username)) if cursor.rowcount > 0: conn.commit() conn.close() return True conn.close() return False except Exception as e: print(f"Error updating user role: {e}") return False def deactivate_user(self, username: str) -> bool: """Deactivate a user account""" try: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(""" UPDATE users SET is_active = 0 WHERE username = ? """, (username,)) if cursor.rowcount > 0: conn.commit() conn.close() return True conn.close() return False except Exception as e: print(f"Error deactivating user: {e}") return False def get_user_permissions(self, username: str) -> List[str]: """Get permissions for a user based on their role""" try: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(""" SELECT r.permissions FROM roles r JOIN users u ON r.name = u.role WHERE u.username = ? """, (username,)) result = cursor.fetchone() conn.close() if result: permissions = json.loads(result[0]) return permissions return [] except Exception as e: print(f"Error getting user permissions: {e}") return [] def check_permission(self, username: str, permission: str) -> bool: """Check if a user has a specific permission""" permissions = self.get_user_permissions(username) return "*" in permissions or permission in permissions