particle-os-tools/src/apt-ostree.py/python/core/sysroot.py
Joe Particle 883fa1e70f
Some checks failed
Compile apt-layer (v2) / compile (push) Has been cancelled
feat: Implement production D-Bus security policy with root-only access
- Update D-Bus policy for production use (root-only access)
- Document production vs development policy rationale
- Enhance D-BUS.md with security considerations
- Update CHANGELOG.md with production security hardening
- Update TODO.md to reflect completed security improvements

This change implements a production-ready security model where only root
users can access the apt-ostree daemon, which is appropriate since all
operations (package installation, OSTree commits, ComposeFS management)
inherently require root privileges. This eliminates the need for complex
PolicyKit authorization rules and provides clear security boundaries.
2025-07-16 04:28:28 +00:00

439 lines
No EOL
15 KiB
Python

"""
Sysroot management system
"""
import os
import gi
import threading
import time
from gi.repository import GLib, GObject, Gio
from typing import Dict, Optional, List, Any
import logging
# Import OSTree bindings
try:
gi.require_version('OSTree', '1.0')
from gi.repository import OSTree
except ImportError:
OSTree = None
class AptOstreeSysroot(GObject.Object):
"""Manages the system root and OSTree repository"""
def __init__(self, config: Dict[str, Any], logger):
super().__init__()
self.config = config
self.logger = logger
# OSTree integration
self.ot_sysroot: Optional[OSTree.Sysroot] = None
self.repo: Optional[OSTree.Repo] = None
self.repo_last_stat = None
# Transaction management
self.transaction: Optional[Any] = None
self.close_transaction_timeout_id = None
# Security
self.authority = None # PolicyKit authority
# Interface management
self.os_interfaces = {}
self.osexperimental_interfaces = {}
# File monitoring
self.monitor = None
self.sig_changed = None
# State
self.path = config.get('sysroot.path', '/')
self.repo_path = config.get('sysroot.repo_path', '/var/lib/ostree/repo')
self.locked = False
self.lock_thread = None
self.logger.info(f"Sysroot initialized for path: {self.path}")
def initialize(self) -> bool:
"""Initialize the sysroot"""
try:
self.logger.info("Initializing sysroot")
# Check if OSTree is available
if OSTree is None:
self.logger.warning("OSTree bindings not available, running in test mode")
return self._initialize_test_mode()
# Check if we're in an OSTree system
if not os.path.exists("/run/ostree-booted"):
self.logger.warning("Not in OSTree system, running in test mode")
return self._initialize_test_mode()
# Initialize OSTree sysroot
self.ot_sysroot = OSTree.Sysroot.new(Gio.File.new_for_path(self.path))
# Load sysroot
self.ot_sysroot.load(None)
# Initialize repository
if not self._initialize_repository():
return False
# Setup file monitoring
self._setup_file_monitoring()
# Load deployments
self._load_deployments()
self.logger.info("Sysroot initialized successfully")
return True
except Exception as e:
self.logger.error(f"Failed to initialize sysroot: {e}")
return self._initialize_test_mode()
def _initialize_test_mode(self) -> bool:
"""Initialize in test mode without OSTree"""
try:
self.logger.info("Initializing in test mode")
# Create mock deployments for testing
self.os_interfaces = {
'test-os': {
'name': 'test-os',
'deployments': [
{
'checksum': 'test-commit-123',
'booted': True,
'pinned': False,
'version': 'test-1.0'
}
],
'cached_update': None
}
}
self.logger.info("Test mode initialized successfully")
return True
except Exception as e:
self.logger.error(f"Failed to initialize test mode: {e}")
return False
def _initialize_repository(self) -> bool:
"""Initialize OSTree repository"""
try:
repo_file = Gio.File.new_for_path(self.repo_path)
if not repo_file.query_exists(None):
# Create repository
self.logger.info(f"Creating OSTree repository at {self.repo_path}")
self.repo = OSTree.Repo.new(repo_file)
self.repo.create(OSTree.RepoMode.BARE, None)
else:
# Open existing repository
self.repo = OSTree.Repo.new(repo_file)
self.repo.open(None)
self.logger.info(f"OSTree repository initialized: {self.repo_path}")
return True
except Exception as e:
self.logger.error(f"Failed to initialize repository: {e}")
return False
def _setup_file_monitoring(self):
"""Setup file monitoring for sysroot changes"""
try:
# Monitor sysroot directory
sysroot_file = Gio.File.new_for_path(self.path)
self.monitor = sysroot_file.monitor_directory(
Gio.FileMonitorFlags.NONE, None
)
self.sig_changed = self.monitor.connect("changed", self._on_sysroot_changed)
self.logger.info("File monitoring setup for sysroot")
except Exception as e:
self.logger.warning(f"Failed to setup file monitoring: {e}")
def _on_sysroot_changed(self, monitor, file, other_file, event_type):
"""Handle sysroot file changes"""
try:
self.logger.debug(f"Sysroot changed: {file.get_path()} - {event_type}")
# Reload deployments if needed
if event_type == Gio.FileMonitorEvent.CHANGES_DONE_HINT:
self._load_deployments()
except Exception as e:
self.logger.warning(f"Error handling sysroot change: {e}")
def _load_deployments(self):
"""Load deployments from sysroot"""
try:
if not self.ot_sysroot:
return
# Get deployments
deployments = self.ot_sysroot.get_deployments()
self.logger.info(f"Loaded {len(deployments)} deployments")
# Process deployments
for deployment in deployments:
self._process_deployment(deployment)
except Exception as e:
self.logger.error(f"Failed to load deployments: {e}")
def _process_deployment(self, deployment: OSTree.Deployment):
"""Process a deployment"""
try:
# Get deployment information
checksum = deployment.get_csum()
origin = deployment.get_origin()
booted = deployment.get_booted()
self.logger.debug(f"Deployment: {checksum} (booted: {booted})")
# Create OS interface if needed
if origin:
os_name = origin.get_string("origin", "refspec")
if os_name and os_name not in self.os_interfaces:
self._create_os_interface(os_name)
except Exception as e:
self.logger.warning(f"Failed to process deployment: {e}")
def _create_os_interface(self, os_name: str):
"""Create OS interface for given OS name"""
try:
# This would create a D-Bus interface for the OS
# Implementation depends on D-Bus interface structure
self.os_interfaces[os_name] = {
'name': os_name,
'deployments': [],
'cached_update': None
}
self.logger.info(f"Created OS interface for: {os_name}")
except Exception as e:
self.logger.error(f"Failed to create OS interface for {os_name}: {e}")
def lock(self) -> bool:
"""Lock the sysroot for exclusive access"""
if self.locked:
return True
try:
# In a real implementation, this would use OSTree's locking mechanism
self.locked = True
self.lock_thread = threading.current_thread()
self.logger.info("Sysroot locked")
return True
except Exception as e:
self.logger.error(f"Failed to lock sysroot: {e}")
return False
def unlock(self):
"""Unlock the sysroot"""
if not self.locked:
return
try:
self.locked = False
self.lock_thread = None
self.logger.info("Sysroot unlocked")
except Exception as e:
self.logger.error(f"Failed to unlock sysroot: {e}")
def clone(self) -> 'AptOstreeSysroot':
"""Create a clone of the sysroot for transaction use"""
try:
# Create new instance with same configuration
clone = AptOstreeSysroot(self.config, self.logger)
# Initialize without file monitoring
clone.ot_sysroot = OSTree.Sysroot.new(Gio.File.new_for_path(self.path))
clone.ot_sysroot.load(None)
clone.repo = OSTree.Repo.new(Gio.File.new_for_path(self.repo_path))
clone.repo.open(None)
self.logger.info("Sysroot cloned for transaction")
return clone
except Exception as e:
self.logger.error(f"Failed to clone sysroot: {e}")
raise
def get_deployments(self) -> List[Dict[str, Any]]:
"""Get list of deployments"""
try:
if not self.ot_sysroot:
return []
deployments = self.ot_sysroot.get_deployments()
deployment_list = []
for deployment in deployments:
deployment_info = {
'checksum': deployment.get_csum(),
'booted': deployment.get_booted(),
'pinned': deployment.get_pinned(),
'origin': {}
}
# Get origin information
origin = deployment.get_origin()
if origin:
deployment_info['origin'] = {
'refspec': origin.get_string("origin", "refspec"),
'description': origin.get_string("origin", "description")
}
deployment_list.append(deployment_info)
return deployment_list
except Exception as e:
self.logger.error(f"Failed to get deployments: {e}")
return []
def get_booted_deployment(self) -> Optional[Dict[str, Any]]:
"""Get currently booted deployment"""
try:
deployments = self.get_deployments()
for deployment in deployments:
if deployment['booted']:
return deployment
return None
except Exception as e:
self.logger.error(f"Failed to get booted deployment: {e}")
return None
def create_deployment(self, commit: str, origin_refspec: str = None) -> Optional[str]:
"""Create a new deployment"""
try:
if not self.ot_sysroot or not self.repo:
raise Exception("Sysroot or repository not initialized")
# Create deployment
deployment = self.ot_sysroot.deploy_tree(
origin_refspec or "debian/apt-ostree",
commit,
None, # origin
None, # override_origin
None, # flags
None # cancellable
)
self.logger.info(f"Created deployment: {deployment.get_csum()}")
return deployment.get_csum()
except Exception as e:
self.logger.error(f"Failed to create deployment: {e}")
return None
def set_default_deployment(self, checksum: str) -> bool:
"""Set default deployment"""
try:
if not self.ot_sysroot:
return False
# Find deployment by checksum
deployments = self.ot_sysroot.get_deployments()
for deployment in deployments:
if deployment.get_csum() == checksum:
# Set as default
self.ot_sysroot.set_default_deployment(deployment)
self.logger.info(f"Set default deployment: {checksum}")
return True
self.logger.error(f"Deployment not found: {checksum}")
return False
except Exception as e:
self.logger.error(f"Failed to set default deployment: {e}")
return False
def cleanup_deployments(self, keep_count: int = 2) -> int:
"""Clean up old deployments"""
try:
if not self.ot_sysroot:
return 0
deployments = self.ot_sysroot.get_deployments()
# Keep booted and pinned deployments
to_keep = []
to_remove = []
for deployment in deployments:
if deployment.get_booted() or deployment.get_pinned():
to_keep.append(deployment)
else:
to_remove.append(deployment)
# Keep the most recent non-booted deployments
to_remove.sort(key=lambda d: d.get_csum(), reverse=True)
to_keep.extend(to_remove[:keep_count])
# Remove old deployments
removed_count = 0
for deployment in to_remove[keep_count:]:
try:
self.ot_sysroot.delete_deployment(deployment, None)
removed_count += 1
except Exception as e:
self.logger.warning(f"Failed to remove deployment {deployment.get_csum()}: {e}")
self.logger.info(f"Cleaned up {removed_count} deployments")
return removed_count
except Exception as e:
self.logger.error(f"Failed to cleanup deployments: {e}")
return 0
def shutdown(self):
"""Shutdown the sysroot"""
try:
self.logger.info("Shutting down sysroot")
# Unlock if locked
if self.locked:
self.unlock()
# Cleanup file monitoring
if self.monitor and self.sig_changed:
self.monitor.disconnect(self.sig_changed)
self.monitor = None
# Close repository
if self.repo:
self.repo = None
# Close sysroot
if self.ot_sysroot:
self.ot_sysroot = None
self.logger.info("Sysroot shutdown complete")
except Exception as e:
self.logger.error(f"Error during sysroot shutdown: {e}")
def get_status(self) -> Dict[str, Any]:
"""Get sysroot status"""
return {
'path': self.path,
'repo_path': self.repo_path,
'locked': self.locked,
'deployments_count': len(self.get_deployments()),
'booted_deployment': self.get_booted_deployment(),
'os_interfaces_count': len(self.os_interfaces)
}