- Add complete pytest testing framework with conftest.py and test files - Add performance monitoring and benchmarking capabilities - Add plugin system with ccache plugin example - Add comprehensive documentation (API, deployment, testing, etc.) - Add Docker API wrapper for service deployment - Add advanced configuration examples - Remove old wget package file - Update core modules with enhanced functionality
305 lines
11 KiB
Python
305 lines
11 KiB
Python
"""
|
|
UID/GID management for deb-mock
|
|
Based on Fedora Mock's UID management system
|
|
"""
|
|
|
|
import os
|
|
import grp
|
|
import pwd
|
|
import subprocess
|
|
import logging
|
|
from contextlib import contextmanager
|
|
from typing import Optional, Tuple, Dict, Any
|
|
|
|
from .exceptions import UIDManagerError
|
|
|
|
|
|
class UIDManager:
|
|
"""Manages UID/GID operations for deb-mock chroots"""
|
|
|
|
def __init__(self, config):
|
|
self.config = config
|
|
self.logger = logging.getLogger(__name__)
|
|
|
|
# Default user/group configuration
|
|
self.chroot_user = getattr(config, 'chroot_user', 'build')
|
|
self.chroot_group = getattr(config, 'chroot_group', 'build')
|
|
self.chroot_uid = getattr(config, 'chroot_uid', 1000)
|
|
self.chroot_gid = getattr(config, 'chroot_gid', 1000)
|
|
|
|
# Current user information
|
|
self.current_uid = os.getuid()
|
|
self.current_gid = os.getgid()
|
|
self.current_user = pwd.getpwuid(self.current_uid).pw_name
|
|
|
|
# Privilege stack for context management
|
|
self._privilege_stack = []
|
|
self._environment_stack = []
|
|
|
|
# Validate configuration
|
|
self._validate_config()
|
|
|
|
def _validate_config(self):
|
|
"""Validate UID/GID configuration"""
|
|
try:
|
|
# Check if chroot user/group exist on host
|
|
if hasattr(self.config, 'use_host_user') and self.config.use_host_user:
|
|
try:
|
|
pwd.getpwnam(self.chroot_user)
|
|
grp.getgrnam(self.chroot_group)
|
|
except KeyError as e:
|
|
self.logger.warning(f"Host user/group not found: {e}")
|
|
|
|
# Validate UID/GID ranges
|
|
if self.chroot_uid < 1000:
|
|
self.logger.warning(f"Chroot UID {self.chroot_uid} is below 1000")
|
|
if self.chroot_gid < 1000:
|
|
self.logger.warning(f"Chroot GID {self.chroot_gid} is below 1000")
|
|
|
|
except Exception as e:
|
|
raise UIDManagerError(f"UID configuration validation failed: {e}")
|
|
|
|
@contextmanager
|
|
def elevated_privileges(self):
|
|
"""Context manager for elevated privileges"""
|
|
self._push_privileges()
|
|
self._elevate_privileges()
|
|
try:
|
|
yield
|
|
finally:
|
|
self._restore_privileges()
|
|
|
|
def _push_privileges(self):
|
|
"""Save current privilege state"""
|
|
self._privilege_stack.append({
|
|
'ruid': os.getuid(),
|
|
'euid': os.geteuid(),
|
|
'rgid': os.getgid(),
|
|
'egid': os.getegid(),
|
|
})
|
|
self._environment_stack.append(dict(os.environ))
|
|
|
|
def _elevate_privileges(self):
|
|
"""Elevate to root privileges"""
|
|
try:
|
|
os.setregid(0, 0)
|
|
os.setreuid(0, 0)
|
|
except PermissionError:
|
|
raise UIDManagerError("Failed to elevate privileges - requires root access")
|
|
|
|
def _restore_privileges(self):
|
|
"""Restore previous privilege state"""
|
|
if not self._privilege_stack:
|
|
return
|
|
|
|
privs = self._privilege_stack.pop()
|
|
env = self._environment_stack.pop()
|
|
|
|
# Restore environment
|
|
os.environ.clear()
|
|
os.environ.update(env)
|
|
|
|
# Restore UID/GID
|
|
os.setregid(privs['rgid'], privs['egid'])
|
|
os.setreuid(privs['ruid'], privs['euid'])
|
|
|
|
def become_user(self, uid: int, gid: Optional[int] = None) -> None:
|
|
"""Become a specific user/group"""
|
|
if gid is None:
|
|
gid = uid
|
|
|
|
self._push_privileges()
|
|
self._elevate_privileges()
|
|
os.setregid(gid, gid)
|
|
os.setreuid(uid, uid)
|
|
|
|
def restore_privileges(self) -> None:
|
|
"""Restore previous privilege state"""
|
|
self._restore_privileges()
|
|
|
|
def change_owner(self, path: str, uid: Optional[int] = None, gid: Optional[int] = None, recursive: bool = False) -> None:
|
|
"""Change ownership of files/directories"""
|
|
if uid is None:
|
|
uid = self.chroot_uid
|
|
if gid is None:
|
|
gid = self.chroot_gid
|
|
|
|
with self.elevated_privileges():
|
|
self._tolerant_chown(path, uid, gid)
|
|
if recursive:
|
|
for root, dirs, files in os.walk(path):
|
|
for d in dirs:
|
|
self._tolerant_chown(os.path.join(root, d), uid, gid)
|
|
for f in files:
|
|
self._tolerant_chown(os.path.join(root, f), uid, gid)
|
|
|
|
def _tolerant_chown(self, path: str, uid: int, gid: int) -> None:
|
|
"""Change ownership without raising errors for missing files"""
|
|
try:
|
|
os.lchown(path, uid, gid)
|
|
except OSError as e:
|
|
if e.errno != 2: # ENOENT - No such file or directory
|
|
self.logger.warning(f"Failed to change ownership of {path}: {e}")
|
|
|
|
def create_chroot_user(self, chroot_path: str) -> None:
|
|
"""Create the build user in the chroot"""
|
|
with self.elevated_privileges():
|
|
try:
|
|
# Create group first
|
|
self._create_group_in_chroot(chroot_path, self.chroot_group, self.chroot_gid)
|
|
|
|
# Create user
|
|
self._create_user_in_chroot(chroot_path, self.chroot_user, self.chroot_uid, self.chroot_gid)
|
|
|
|
# Setup home directory
|
|
self._setup_home_directory(chroot_path)
|
|
|
|
self.logger.info(f"Created chroot user {self.chroot_user} (UID: {self.chroot_uid}, GID: {self.chroot_gid})")
|
|
|
|
except Exception as e:
|
|
raise UIDManagerError(f"Failed to create chroot user: {e}")
|
|
|
|
def _create_group_in_chroot(self, chroot_path: str, group_name: str, gid: int) -> None:
|
|
"""Create a group in the chroot"""
|
|
group_file = os.path.join(chroot_path, 'etc', 'group')
|
|
|
|
# Check if group already exists
|
|
if os.path.exists(group_file):
|
|
with open(group_file, 'r') as f:
|
|
for line in f:
|
|
if line.startswith(f"{group_name}:"):
|
|
return # Group already exists
|
|
|
|
# Create group entry
|
|
group_entry = f"{group_name}:x:{gid}:\n"
|
|
|
|
# Ensure /etc directory exists
|
|
os.makedirs(os.path.dirname(group_file), exist_ok=True)
|
|
|
|
# Append to group file
|
|
with open(group_file, 'a') as f:
|
|
f.write(group_entry)
|
|
|
|
def _create_user_in_chroot(self, chroot_path: str, username: str, uid: int, gid: int) -> None:
|
|
"""Create a user in the chroot"""
|
|
passwd_file = os.path.join(chroot_path, 'etc', 'passwd')
|
|
home_dir = os.path.join(chroot_path, 'home', username)
|
|
|
|
# Check if user already exists
|
|
if os.path.exists(passwd_file):
|
|
with open(passwd_file, 'r') as f:
|
|
for line in f:
|
|
if line.startswith(f"{username}:"):
|
|
return # User already exists
|
|
|
|
# Create user entry
|
|
user_entry = f"{username}:x:{uid}:{gid}:Build User:/home/{username}:/bin/bash\n"
|
|
|
|
# Ensure /etc directory exists
|
|
os.makedirs(os.path.dirname(passwd_file), exist_ok=True)
|
|
|
|
# Append to passwd file
|
|
with open(passwd_file, 'a') as f:
|
|
f.write(user_entry)
|
|
|
|
def _setup_home_directory(self, chroot_path: str) -> None:
|
|
"""Setup home directory for the build user"""
|
|
home_dir = os.path.join(chroot_path, 'home', self.chroot_user)
|
|
|
|
# Create home directory
|
|
os.makedirs(home_dir, exist_ok=True)
|
|
|
|
# Set ownership
|
|
self._tolerant_chown(home_dir, self.chroot_uid, self.chroot_gid)
|
|
|
|
# Set permissions
|
|
os.chmod(home_dir, 0o755)
|
|
|
|
def copy_host_user(self, chroot_path: str, username: str) -> None:
|
|
"""Copy a user from the host system to the chroot"""
|
|
try:
|
|
# Get user info from host
|
|
user_info = pwd.getpwnam(username)
|
|
uid = user_info.pw_uid
|
|
gid = user_info.pw_gid
|
|
|
|
# Get group info
|
|
group_info = grp.getgrgid(gid)
|
|
group_name = group_info.gr_name
|
|
|
|
# Create in chroot
|
|
self._create_group_in_chroot(chroot_path, group_name, gid)
|
|
self._create_user_in_chroot(chroot_path, username, uid, gid)
|
|
|
|
self.logger.info(f"Copied host user {username} (UID: {uid}, GID: {gid}) to chroot")
|
|
|
|
except KeyError as e:
|
|
raise UIDManagerError(f"Host user {username} not found: {e}")
|
|
except Exception as e:
|
|
raise UIDManagerError(f"Failed to copy host user {username}: {e}")
|
|
|
|
def setup_chroot_permissions(self, chroot_path: str) -> None:
|
|
"""Setup proper permissions for the chroot"""
|
|
with self.elevated_privileges():
|
|
try:
|
|
# Change ownership of key directories
|
|
key_dirs = [
|
|
'home',
|
|
'tmp',
|
|
'var/tmp',
|
|
'var/cache',
|
|
'var/log'
|
|
]
|
|
|
|
for dir_name in key_dirs:
|
|
dir_path = os.path.join(chroot_path, dir_name)
|
|
if os.path.exists(dir_path):
|
|
self._tolerant_chown(dir_path, self.chroot_uid, self.chroot_gid)
|
|
|
|
# Ensure proper permissions on /tmp
|
|
tmp_path = os.path.join(chroot_path, 'tmp')
|
|
if os.path.exists(tmp_path):
|
|
os.chmod(tmp_path, 0o1777)
|
|
|
|
self.logger.info("Chroot permissions setup complete")
|
|
|
|
except Exception as e:
|
|
raise UIDManagerError(f"Failed to setup chroot permissions: {e}")
|
|
|
|
def get_user_info(self) -> Dict[str, Any]:
|
|
"""Get current user information"""
|
|
return {
|
|
'current_uid': self.current_uid,
|
|
'current_gid': self.current_gid,
|
|
'current_user': self.current_user,
|
|
'chroot_user': self.chroot_user,
|
|
'chroot_group': self.chroot_group,
|
|
'chroot_uid': self.chroot_uid,
|
|
'chroot_gid': self.chroot_gid
|
|
}
|
|
|
|
def validate_chroot_user(self, chroot_path: str) -> bool:
|
|
"""Validate that the chroot user exists and is properly configured"""
|
|
passwd_file = os.path.join(chroot_path, 'etc', 'passwd')
|
|
group_file = os.path.join(chroot_path, 'etc', 'group')
|
|
|
|
if not os.path.exists(passwd_file) or not os.path.exists(group_file):
|
|
return False
|
|
|
|
# Check if user exists
|
|
user_exists = False
|
|
group_exists = False
|
|
|
|
with open(passwd_file, 'r') as f:
|
|
for line in f:
|
|
if line.startswith(f"{self.chroot_user}:"):
|
|
user_exists = True
|
|
break
|
|
|
|
with open(group_file, 'r') as f:
|
|
for line in f:
|
|
if line.startswith(f"{self.chroot_group}:"):
|
|
group_exists = True
|
|
break
|
|
|
|
return user_exists and group_exists
|