deb-mock/deb_mock/uid_manager.py
robojerk c51819c836
Some checks failed
Build Deb-Mock Package / build (push) Failing after 1m9s
Lint Code / Lint All Code (push) Failing after 1s
Test Deb-Mock Build / test (push) Failing after 35s
Add comprehensive testing framework, performance monitoring, and plugin system
- Add complete pytest testing framework with conftest.py and test files
- Add performance monitoring and benchmarking capabilities
- Add plugin system with ccache plugin example
- Add comprehensive documentation (API, deployment, testing, etc.)
- Add Docker API wrapper for service deployment
- Add advanced configuration examples
- Remove old wget package file
- Update core modules with enhanced functionality
2025-08-19 20:49:32 -07:00

305 lines
11 KiB
Python

"""
UID/GID management for deb-mock
Based on Fedora Mock's UID management system
"""
import os
import grp
import pwd
import subprocess
import logging
from contextlib import contextmanager
from typing import Optional, Tuple, Dict, Any
from .exceptions import UIDManagerError
class UIDManager:
"""Manages UID/GID operations for deb-mock chroots"""
def __init__(self, config):
self.config = config
self.logger = logging.getLogger(__name__)
# Default user/group configuration
self.chroot_user = getattr(config, 'chroot_user', 'build')
self.chroot_group = getattr(config, 'chroot_group', 'build')
self.chroot_uid = getattr(config, 'chroot_uid', 1000)
self.chroot_gid = getattr(config, 'chroot_gid', 1000)
# Current user information
self.current_uid = os.getuid()
self.current_gid = os.getgid()
self.current_user = pwd.getpwuid(self.current_uid).pw_name
# Privilege stack for context management
self._privilege_stack = []
self._environment_stack = []
# Validate configuration
self._validate_config()
def _validate_config(self):
"""Validate UID/GID configuration"""
try:
# Check if chroot user/group exist on host
if hasattr(self.config, 'use_host_user') and self.config.use_host_user:
try:
pwd.getpwnam(self.chroot_user)
grp.getgrnam(self.chroot_group)
except KeyError as e:
self.logger.warning(f"Host user/group not found: {e}")
# Validate UID/GID ranges
if self.chroot_uid < 1000:
self.logger.warning(f"Chroot UID {self.chroot_uid} is below 1000")
if self.chroot_gid < 1000:
self.logger.warning(f"Chroot GID {self.chroot_gid} is below 1000")
except Exception as e:
raise UIDManagerError(f"UID configuration validation failed: {e}")
@contextmanager
def elevated_privileges(self):
"""Context manager for elevated privileges"""
self._push_privileges()
self._elevate_privileges()
try:
yield
finally:
self._restore_privileges()
def _push_privileges(self):
"""Save current privilege state"""
self._privilege_stack.append({
'ruid': os.getuid(),
'euid': os.geteuid(),
'rgid': os.getgid(),
'egid': os.getegid(),
})
self._environment_stack.append(dict(os.environ))
def _elevate_privileges(self):
"""Elevate to root privileges"""
try:
os.setregid(0, 0)
os.setreuid(0, 0)
except PermissionError:
raise UIDManagerError("Failed to elevate privileges - requires root access")
def _restore_privileges(self):
"""Restore previous privilege state"""
if not self._privilege_stack:
return
privs = self._privilege_stack.pop()
env = self._environment_stack.pop()
# Restore environment
os.environ.clear()
os.environ.update(env)
# Restore UID/GID
os.setregid(privs['rgid'], privs['egid'])
os.setreuid(privs['ruid'], privs['euid'])
def become_user(self, uid: int, gid: Optional[int] = None) -> None:
"""Become a specific user/group"""
if gid is None:
gid = uid
self._push_privileges()
self._elevate_privileges()
os.setregid(gid, gid)
os.setreuid(uid, uid)
def restore_privileges(self) -> None:
"""Restore previous privilege state"""
self._restore_privileges()
def change_owner(self, path: str, uid: Optional[int] = None, gid: Optional[int] = None, recursive: bool = False) -> None:
"""Change ownership of files/directories"""
if uid is None:
uid = self.chroot_uid
if gid is None:
gid = self.chroot_gid
with self.elevated_privileges():
self._tolerant_chown(path, uid, gid)
if recursive:
for root, dirs, files in os.walk(path):
for d in dirs:
self._tolerant_chown(os.path.join(root, d), uid, gid)
for f in files:
self._tolerant_chown(os.path.join(root, f), uid, gid)
def _tolerant_chown(self, path: str, uid: int, gid: int) -> None:
"""Change ownership without raising errors for missing files"""
try:
os.lchown(path, uid, gid)
except OSError as e:
if e.errno != 2: # ENOENT - No such file or directory
self.logger.warning(f"Failed to change ownership of {path}: {e}")
def create_chroot_user(self, chroot_path: str) -> None:
"""Create the build user in the chroot"""
with self.elevated_privileges():
try:
# Create group first
self._create_group_in_chroot(chroot_path, self.chroot_group, self.chroot_gid)
# Create user
self._create_user_in_chroot(chroot_path, self.chroot_user, self.chroot_uid, self.chroot_gid)
# Setup home directory
self._setup_home_directory(chroot_path)
self.logger.info(f"Created chroot user {self.chroot_user} (UID: {self.chroot_uid}, GID: {self.chroot_gid})")
except Exception as e:
raise UIDManagerError(f"Failed to create chroot user: {e}")
def _create_group_in_chroot(self, chroot_path: str, group_name: str, gid: int) -> None:
"""Create a group in the chroot"""
group_file = os.path.join(chroot_path, 'etc', 'group')
# Check if group already exists
if os.path.exists(group_file):
with open(group_file, 'r') as f:
for line in f:
if line.startswith(f"{group_name}:"):
return # Group already exists
# Create group entry
group_entry = f"{group_name}:x:{gid}:\n"
# Ensure /etc directory exists
os.makedirs(os.path.dirname(group_file), exist_ok=True)
# Append to group file
with open(group_file, 'a') as f:
f.write(group_entry)
def _create_user_in_chroot(self, chroot_path: str, username: str, uid: int, gid: int) -> None:
"""Create a user in the chroot"""
passwd_file = os.path.join(chroot_path, 'etc', 'passwd')
home_dir = os.path.join(chroot_path, 'home', username)
# Check if user already exists
if os.path.exists(passwd_file):
with open(passwd_file, 'r') as f:
for line in f:
if line.startswith(f"{username}:"):
return # User already exists
# Create user entry
user_entry = f"{username}:x:{uid}:{gid}:Build User:/home/{username}:/bin/bash\n"
# Ensure /etc directory exists
os.makedirs(os.path.dirname(passwd_file), exist_ok=True)
# Append to passwd file
with open(passwd_file, 'a') as f:
f.write(user_entry)
def _setup_home_directory(self, chroot_path: str) -> None:
"""Setup home directory for the build user"""
home_dir = os.path.join(chroot_path, 'home', self.chroot_user)
# Create home directory
os.makedirs(home_dir, exist_ok=True)
# Set ownership
self._tolerant_chown(home_dir, self.chroot_uid, self.chroot_gid)
# Set permissions
os.chmod(home_dir, 0o755)
def copy_host_user(self, chroot_path: str, username: str) -> None:
"""Copy a user from the host system to the chroot"""
try:
# Get user info from host
user_info = pwd.getpwnam(username)
uid = user_info.pw_uid
gid = user_info.pw_gid
# Get group info
group_info = grp.getgrgid(gid)
group_name = group_info.gr_name
# Create in chroot
self._create_group_in_chroot(chroot_path, group_name, gid)
self._create_user_in_chroot(chroot_path, username, uid, gid)
self.logger.info(f"Copied host user {username} (UID: {uid}, GID: {gid}) to chroot")
except KeyError as e:
raise UIDManagerError(f"Host user {username} not found: {e}")
except Exception as e:
raise UIDManagerError(f"Failed to copy host user {username}: {e}")
def setup_chroot_permissions(self, chroot_path: str) -> None:
"""Setup proper permissions for the chroot"""
with self.elevated_privileges():
try:
# Change ownership of key directories
key_dirs = [
'home',
'tmp',
'var/tmp',
'var/cache',
'var/log'
]
for dir_name in key_dirs:
dir_path = os.path.join(chroot_path, dir_name)
if os.path.exists(dir_path):
self._tolerant_chown(dir_path, self.chroot_uid, self.chroot_gid)
# Ensure proper permissions on /tmp
tmp_path = os.path.join(chroot_path, 'tmp')
if os.path.exists(tmp_path):
os.chmod(tmp_path, 0o1777)
self.logger.info("Chroot permissions setup complete")
except Exception as e:
raise UIDManagerError(f"Failed to setup chroot permissions: {e}")
def get_user_info(self) -> Dict[str, Any]:
"""Get current user information"""
return {
'current_uid': self.current_uid,
'current_gid': self.current_gid,
'current_user': self.current_user,
'chroot_user': self.chroot_user,
'chroot_group': self.chroot_group,
'chroot_uid': self.chroot_uid,
'chroot_gid': self.chroot_gid
}
def validate_chroot_user(self, chroot_path: str) -> bool:
"""Validate that the chroot user exists and is properly configured"""
passwd_file = os.path.join(chroot_path, 'etc', 'passwd')
group_file = os.path.join(chroot_path, 'etc', 'group')
if not os.path.exists(passwd_file) or not os.path.exists(group_file):
return False
# Check if user exists
user_exists = False
group_exists = False
with open(passwd_file, 'r') as f:
for line in f:
if line.startswith(f"{self.chroot_user}:"):
user_exists = True
break
with open(group_file, 'r') as f:
for line in f:
if line.startswith(f"{self.chroot_group}:"):
group_exists = True
break
return user_exists and group_exists