327 lines
10 KiB
Python
327 lines
10 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Debian Forge User Management System
|
|
|
|
This module provides user authentication, authorization, and management
|
|
for the Debian Forge composer system.
|
|
"""
|
|
|
|
import hashlib
|
|
import secrets
|
|
import sqlite3
|
|
import time
|
|
from typing import Dict, List, Optional, Any
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
import json
|
|
|
|
@dataclass
|
|
class User:
|
|
"""Represents a user in the system"""
|
|
username: str
|
|
email: str
|
|
role: str
|
|
created_at: str
|
|
last_login: Optional[str] = None
|
|
is_active: bool = True
|
|
|
|
@dataclass
|
|
class UserCredentials:
|
|
"""User authentication credentials"""
|
|
username: str
|
|
password_hash: str
|
|
salt: str
|
|
created_at: str
|
|
|
|
class UserManager:
|
|
"""Manages user accounts and authentication"""
|
|
|
|
def __init__(self, db_path: str = "users.db"):
|
|
self.db_path = db_path
|
|
self._init_database()
|
|
|
|
def _init_database(self):
|
|
"""Initialize the user database"""
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.cursor()
|
|
|
|
# Create users table
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS users (
|
|
username TEXT PRIMARY KEY,
|
|
email TEXT UNIQUE NOT NULL,
|
|
role TEXT NOT NULL,
|
|
created_at TEXT NOT NULL,
|
|
last_login TEXT,
|
|
is_active BOOLEAN DEFAULT 1
|
|
)
|
|
""")
|
|
|
|
# Create credentials table
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS user_credentials (
|
|
username TEXT PRIMARY KEY,
|
|
password_hash TEXT NOT NULL,
|
|
salt TEXT NOT NULL,
|
|
created_at TEXT NOT NULL,
|
|
FOREIGN KEY (username) REFERENCES users (username)
|
|
)
|
|
""")
|
|
|
|
# Create roles table
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS roles (
|
|
name TEXT PRIMARY KEY,
|
|
permissions TEXT NOT NULL,
|
|
description TEXT
|
|
)
|
|
""")
|
|
|
|
# Insert default roles if they don't exist
|
|
default_roles = [
|
|
("admin", json.dumps(["*"]), "Full system access"),
|
|
("user", json.dumps(["read", "build"]), "Standard user access"),
|
|
("viewer", json.dumps(["read"]), "Read-only access")
|
|
]
|
|
|
|
for role, permissions, description in default_roles:
|
|
cursor.execute("""
|
|
INSERT OR IGNORE INTO roles (name, permissions, description)
|
|
VALUES (?, ?, ?)
|
|
""", (role, permissions, description))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
def _hash_password(self, password: str, salt: str) -> str:
|
|
"""Hash a password with salt using SHA-256"""
|
|
return hashlib.sha256((password + salt).encode()).hexdigest()
|
|
|
|
def _generate_salt(self) -> str:
|
|
"""Generate a random salt"""
|
|
return secrets.token_hex(16)
|
|
|
|
def create_user(self, username: str, email: str, password: str, role: str = "user") -> bool:
|
|
"""Create a new user account"""
|
|
try:
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.cursor()
|
|
|
|
# Check if user already exists
|
|
cursor.execute("SELECT username FROM users WHERE username = ?", (username,))
|
|
if cursor.fetchone():
|
|
return False
|
|
|
|
# Check if email already exists
|
|
cursor.execute("SELECT email FROM users WHERE email = ?", (email,))
|
|
if cursor.fetchone():
|
|
return False
|
|
|
|
# Create user record
|
|
created_at = time.strftime("%Y-%m-%d %H:%M:%S")
|
|
cursor.execute("""
|
|
INSERT INTO users (username, email, role, created_at)
|
|
VALUES (?, ?, ?, ?)
|
|
""", (username, email, role, created_at))
|
|
|
|
# Create credentials record
|
|
salt = self._generate_salt()
|
|
password_hash = self._hash_password(password, salt)
|
|
cursor.execute("""
|
|
INSERT INTO user_credentials (username, password_hash, salt, created_at)
|
|
VALUES (?, ?, ?, ?)
|
|
""", (username, password_hash, salt, created_at))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"Error creating user: {e}")
|
|
return False
|
|
|
|
def authenticate_user(self, username: str, password: str) -> Optional[User]:
|
|
"""Authenticate a user and return user object if successful"""
|
|
try:
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.cursor()
|
|
|
|
# Get user credentials
|
|
cursor.execute("""
|
|
SELECT uc.password_hash, uc.salt, u.email, u.role, u.created_at, u.is_active
|
|
FROM user_credentials uc
|
|
JOIN users u ON uc.username = u.username
|
|
WHERE uc.username = ?
|
|
""", (username,))
|
|
|
|
result = cursor.fetchone()
|
|
if not result:
|
|
return None
|
|
|
|
password_hash, salt, email, role, created_at, is_active = result
|
|
|
|
# Check if user is active
|
|
if not is_active:
|
|
return None
|
|
|
|
# Verify password
|
|
if self._hash_password(password, salt) == password_hash:
|
|
# Update last login
|
|
last_login = time.strftime("%Y-%m-%d %H:%M:%S")
|
|
cursor.execute("""
|
|
UPDATE users SET last_login = ? WHERE username = ?
|
|
""", (last_login, username))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return User(
|
|
username=username,
|
|
email=email,
|
|
role=role,
|
|
created_at=created_at,
|
|
last_login=last_login,
|
|
is_active=is_active
|
|
)
|
|
|
|
conn.close()
|
|
return None
|
|
|
|
except Exception as e:
|
|
print(f"Error authenticating user: {e}")
|
|
return None
|
|
|
|
def get_user(self, username: str) -> Optional[User]:
|
|
"""Get user information by username"""
|
|
try:
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
SELECT username, email, role, created_at, last_login, is_active
|
|
FROM users WHERE username = ?
|
|
""", (username,))
|
|
|
|
result = cursor.fetchone()
|
|
conn.close()
|
|
|
|
if result:
|
|
username, email, role, created_at, last_login, is_active = result
|
|
return User(
|
|
username=username,
|
|
email=email,
|
|
role=role,
|
|
created_at=created_at,
|
|
last_login=last_login,
|
|
is_active=is_active
|
|
)
|
|
|
|
return None
|
|
|
|
except Exception as e:
|
|
print(f"Error getting user: {e}")
|
|
return None
|
|
|
|
def list_users(self) -> List[User]:
|
|
"""List all users in the system"""
|
|
try:
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
SELECT username, email, role, created_at, last_login, is_active
|
|
FROM users ORDER BY username
|
|
""")
|
|
|
|
users = []
|
|
for row in cursor.fetchall():
|
|
username, email, role, created_at, last_login, is_active = row
|
|
users.append(User(
|
|
username=username,
|
|
email=email,
|
|
role=role,
|
|
created_at=created_at,
|
|
last_login=last_login,
|
|
is_active=is_active
|
|
))
|
|
|
|
conn.close()
|
|
return users
|
|
|
|
except Exception as e:
|
|
print(f"Error listing users: {e}")
|
|
return []
|
|
|
|
def update_user_role(self, username: str, new_role: str) -> bool:
|
|
"""Update a user's role"""
|
|
try:
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
UPDATE users SET role = ? WHERE username = ?
|
|
""", (new_role, username))
|
|
|
|
if cursor.rowcount > 0:
|
|
conn.commit()
|
|
conn.close()
|
|
return True
|
|
|
|
conn.close()
|
|
return False
|
|
|
|
except Exception as e:
|
|
print(f"Error updating user role: {e}")
|
|
return False
|
|
|
|
def deactivate_user(self, username: str) -> bool:
|
|
"""Deactivate a user account"""
|
|
try:
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
UPDATE users SET is_active = 0 WHERE username = ?
|
|
""", (username,))
|
|
|
|
if cursor.rowcount > 0:
|
|
conn.commit()
|
|
conn.close()
|
|
return True
|
|
|
|
conn.close()
|
|
return False
|
|
|
|
except Exception as e:
|
|
print(f"Error deactivating user: {e}")
|
|
return False
|
|
|
|
def get_user_permissions(self, username: str) -> List[str]:
|
|
"""Get permissions for a user based on their role"""
|
|
try:
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
SELECT r.permissions FROM roles r
|
|
JOIN users u ON r.name = u.role
|
|
WHERE u.username = ?
|
|
""", (username,))
|
|
|
|
result = cursor.fetchone()
|
|
conn.close()
|
|
|
|
if result:
|
|
permissions = json.loads(result[0])
|
|
return permissions
|
|
|
|
return []
|
|
|
|
except Exception as e:
|
|
print(f"Error getting user permissions: {e}")
|
|
return []
|
|
|
|
def check_permission(self, username: str, permission: str) -> bool:
|
|
"""Check if a user has a specific permission"""
|
|
permissions = self.get_user_permissions(username)
|
|
return "*" in permissions or permission in permissions
|