""" UID/GID management for deb-mock Based on Fedora Mock's UID management system """ import os import grp import pwd import subprocess import logging from contextlib import contextmanager from typing import Optional, Tuple, Dict, Any from .exceptions import UIDManagerError class UIDManager: """Manages UID/GID operations for deb-mock chroots""" def __init__(self, config): self.config = config self.logger = logging.getLogger(__name__) # Default user/group configuration self.chroot_user = getattr(config, 'chroot_user', 'build') self.chroot_group = getattr(config, 'chroot_group', 'build') self.chroot_uid = getattr(config, 'chroot_uid', 1000) self.chroot_gid = getattr(config, 'chroot_gid', 1000) # Current user information self.current_uid = os.getuid() self.current_gid = os.getgid() self.current_user = pwd.getpwuid(self.current_uid).pw_name # Privilege stack for context management self._privilege_stack = [] self._environment_stack = [] # Validate configuration self._validate_config() def _validate_config(self): """Validate UID/GID configuration""" try: # Check if chroot user/group exist on host if hasattr(self.config, 'use_host_user') and self.config.use_host_user: try: pwd.getpwnam(self.chroot_user) grp.getgrnam(self.chroot_group) except KeyError as e: self.logger.warning(f"Host user/group not found: {e}") # Validate UID/GID ranges if self.chroot_uid < 1000: self.logger.warning(f"Chroot UID {self.chroot_uid} is below 1000") if self.chroot_gid < 1000: self.logger.warning(f"Chroot GID {self.chroot_gid} is below 1000") except Exception as e: raise UIDManagerError(f"UID configuration validation failed: {e}") @contextmanager def elevated_privileges(self): """Context manager for elevated privileges""" self._push_privileges() self._elevate_privileges() try: yield finally: self._restore_privileges() def _push_privileges(self): """Save current privilege state""" self._privilege_stack.append({ 'ruid': os.getuid(), 'euid': os.geteuid(), 'rgid': os.getgid(), 'egid': os.getegid(), }) self._environment_stack.append(dict(os.environ)) def _elevate_privileges(self): """Elevate to root privileges""" try: os.setregid(0, 0) os.setreuid(0, 0) except PermissionError: raise UIDManagerError("Failed to elevate privileges - requires root access") def _restore_privileges(self): """Restore previous privilege state""" if not self._privilege_stack: return privs = self._privilege_stack.pop() env = self._environment_stack.pop() # Restore environment os.environ.clear() os.environ.update(env) # Restore UID/GID os.setregid(privs['rgid'], privs['egid']) os.setreuid(privs['ruid'], privs['euid']) def become_user(self, uid: int, gid: Optional[int] = None) -> None: """Become a specific user/group""" if gid is None: gid = uid self._push_privileges() self._elevate_privileges() os.setregid(gid, gid) os.setreuid(uid, uid) def restore_privileges(self) -> None: """Restore previous privilege state""" self._restore_privileges() def change_owner(self, path: str, uid: Optional[int] = None, gid: Optional[int] = None, recursive: bool = False) -> None: """Change ownership of files/directories""" if uid is None: uid = self.chroot_uid if gid is None: gid = self.chroot_gid with self.elevated_privileges(): self._tolerant_chown(path, uid, gid) if recursive: for root, dirs, files in os.walk(path): for d in dirs: self._tolerant_chown(os.path.join(root, d), uid, gid) for f in files: self._tolerant_chown(os.path.join(root, f), uid, gid) def _tolerant_chown(self, path: str, uid: int, gid: int) -> None: """Change ownership without raising errors for missing files""" try: os.lchown(path, uid, gid) except OSError as e: if e.errno != 2: # ENOENT - No such file or directory self.logger.warning(f"Failed to change ownership of {path}: {e}") def create_chroot_user(self, chroot_path: str) -> None: """Create the build user in the chroot""" with self.elevated_privileges(): try: # Create group first self._create_group_in_chroot(chroot_path, self.chroot_group, self.chroot_gid) # Create user self._create_user_in_chroot(chroot_path, self.chroot_user, self.chroot_uid, self.chroot_gid) # Setup home directory self._setup_home_directory(chroot_path) self.logger.info(f"Created chroot user {self.chroot_user} (UID: {self.chroot_uid}, GID: {self.chroot_gid})") except Exception as e: raise UIDManagerError(f"Failed to create chroot user: {e}") def _create_group_in_chroot(self, chroot_path: str, group_name: str, gid: int) -> None: """Create a group in the chroot""" group_file = os.path.join(chroot_path, 'etc', 'group') # Check if group already exists if os.path.exists(group_file): with open(group_file, 'r') as f: for line in f: if line.startswith(f"{group_name}:"): return # Group already exists # Create group entry group_entry = f"{group_name}:x:{gid}:\n" # Ensure /etc directory exists os.makedirs(os.path.dirname(group_file), exist_ok=True) # Append to group file with open(group_file, 'a') as f: f.write(group_entry) def _create_user_in_chroot(self, chroot_path: str, username: str, uid: int, gid: int) -> None: """Create a user in the chroot""" passwd_file = os.path.join(chroot_path, 'etc', 'passwd') home_dir = os.path.join(chroot_path, 'home', username) # Check if user already exists if os.path.exists(passwd_file): with open(passwd_file, 'r') as f: for line in f: if line.startswith(f"{username}:"): return # User already exists # Create user entry user_entry = f"{username}:x:{uid}:{gid}:Build User:/home/{username}:/bin/bash\n" # Ensure /etc directory exists os.makedirs(os.path.dirname(passwd_file), exist_ok=True) # Append to passwd file with open(passwd_file, 'a') as f: f.write(user_entry) def _setup_home_directory(self, chroot_path: str) -> None: """Setup home directory for the build user""" home_dir = os.path.join(chroot_path, 'home', self.chroot_user) # Create home directory os.makedirs(home_dir, exist_ok=True) # Set ownership self._tolerant_chown(home_dir, self.chroot_uid, self.chroot_gid) # Set permissions os.chmod(home_dir, 0o755) def copy_host_user(self, chroot_path: str, username: str) -> None: """Copy a user from the host system to the chroot""" try: # Get user info from host user_info = pwd.getpwnam(username) uid = user_info.pw_uid gid = user_info.pw_gid # Get group info group_info = grp.getgrgid(gid) group_name = group_info.gr_name # Create in chroot self._create_group_in_chroot(chroot_path, group_name, gid) self._create_user_in_chroot(chroot_path, username, uid, gid) self.logger.info(f"Copied host user {username} (UID: {uid}, GID: {gid}) to chroot") except KeyError as e: raise UIDManagerError(f"Host user {username} not found: {e}") except Exception as e: raise UIDManagerError(f"Failed to copy host user {username}: {e}") def setup_chroot_permissions(self, chroot_path: str) -> None: """Setup proper permissions for the chroot""" with self.elevated_privileges(): try: # Change ownership of key directories key_dirs = [ 'home', 'tmp', 'var/tmp', 'var/cache', 'var/log' ] for dir_name in key_dirs: dir_path = os.path.join(chroot_path, dir_name) if os.path.exists(dir_path): self._tolerant_chown(dir_path, self.chroot_uid, self.chroot_gid) # Ensure proper permissions on /tmp tmp_path = os.path.join(chroot_path, 'tmp') if os.path.exists(tmp_path): os.chmod(tmp_path, 0o1777) self.logger.info("Chroot permissions setup complete") except Exception as e: raise UIDManagerError(f"Failed to setup chroot permissions: {e}") def get_user_info(self) -> Dict[str, Any]: """Get current user information""" return { 'current_uid': self.current_uid, 'current_gid': self.current_gid, 'current_user': self.current_user, 'chroot_user': self.chroot_user, 'chroot_group': self.chroot_group, 'chroot_uid': self.chroot_uid, 'chroot_gid': self.chroot_gid } def validate_chroot_user(self, chroot_path: str) -> bool: """Validate that the chroot user exists and is properly configured""" passwd_file = os.path.join(chroot_path, 'etc', 'passwd') group_file = os.path.join(chroot_path, 'etc', 'group') if not os.path.exists(passwd_file) or not os.path.exists(group_file): return False # Check if user exists user_exists = False group_exists = False with open(passwd_file, 'r') as f: for line in f: if line.startswith(f"{self.chroot_user}:"): user_exists = True break with open(group_file, 'r') as f: for line in f: if line.startswith(f"{self.chroot_group}:"): group_exists = True break return user_exists and group_exists