particle-os-tools/src/apt-ostree.py/python/core/sysroot.py
Joe Particle b31b64d600
Some checks failed
Compile apt-layer (v2) / compile (push) Has been cancelled
fix: Resolve D-Bus property serialization issues
- Fix Deployments property to always return JSON string
- Resolve TypeError and ValueError in D-Bus property serialization
- Add JSON serialization for complex data structures
- Implement fallback values for empty collections
- Update TODO and changelogs to reflect completion
- Ensure full compliance with D-BUS.md and daemon-notes.md
2025-07-16 06:27:39 +00:00

449 lines
No EOL
16 KiB
Python

"""
Sysroot management system
"""
import os
import gi
import threading
import time
from gi.repository import GLib, GObject, Gio
from typing import Dict, Optional, List, Any
import logging
# Import OSTree bindings
try:
gi.require_version('OSTree', '1.0')
from gi.repository import OSTree
except ImportError:
OSTree = None
class AptOstreeSysroot(GObject.Object):
"""Manages the system root and OSTree repository"""
def __init__(self, config: Dict[str, Any], logger):
super().__init__()
self.config = config
self.logger = logger
# OSTree integration
self.ot_sysroot: Optional[OSTree.Sysroot] = None
self.repo: Optional[OSTree.Repo] = None
self.repo_last_stat = None
# Transaction management
self.transaction: Optional[Any] = None
self.close_transaction_timeout_id = None
# Security
self.authority = None # PolicyKit authority
# Interface management
self.os_interfaces = {}
self.osexperimental_interfaces = {}
# File monitoring
self.monitor = None
self.sig_changed = None
# State
self.path = config.get('sysroot.path', '/')
self.repo_path = config.get('sysroot.repo_path', '/var/lib/ostree/repo')
self.locked = False
self.lock_thread = None
self.logger.info(f"Sysroot initialized for path: {self.path}")
def initialize(self) -> bool:
"""Initialize the sysroot"""
try:
self.logger.info("Initializing sysroot")
# Check if OSTree is available
if OSTree is None:
self.logger.warning("OSTree bindings not available, running in test mode")
return self._initialize_test_mode()
# Check if we're in an OSTree system
if not os.path.exists("/run/ostree-booted"):
self.logger.warning("Not in OSTree system, running in test mode")
return self._initialize_test_mode()
# Initialize OSTree sysroot
self.ot_sysroot = OSTree.Sysroot.new(Gio.File.new_for_path(self.path))
# Load sysroot
self.ot_sysroot.load(None)
# Initialize repository
if not self._initialize_repository():
return False
# Setup file monitoring
self._setup_file_monitoring()
# Load deployments
self._load_deployments()
self.logger.info("Sysroot initialized successfully")
return True
except Exception as e:
self.logger.error(f"Failed to initialize sysroot: {e}")
return self._initialize_test_mode()
def _initialize_test_mode(self) -> bool:
"""Initialize in test mode without OSTree"""
try:
self.logger.info("Initializing in test mode")
# Create mock deployments for testing
self.os_interfaces = {
'test-os': {
'name': 'test-os',
'deployments': [
{
'checksum': 'test-commit-123',
'booted': True,
'pinned': False,
'version': 'test-1.0'
}
],
'cached_update': None
}
}
self.logger.info("Test mode initialized successfully")
return True
except Exception as e:
self.logger.error(f"Failed to initialize test mode: {e}")
return False
def _initialize_repository(self) -> bool:
"""Initialize OSTree repository"""
try:
repo_file = Gio.File.new_for_path(self.repo_path)
if not repo_file.query_exists(None):
# Create repository
self.logger.info(f"Creating OSTree repository at {self.repo_path}")
self.repo = OSTree.Repo.new(repo_file)
self.repo.create(OSTree.RepoMode.BARE, None)
else:
# Open existing repository
self.repo = OSTree.Repo.new(repo_file)
self.repo.open(None)
self.logger.info(f"OSTree repository initialized: {self.repo_path}")
return True
except Exception as e:
self.logger.error(f"Failed to initialize repository: {e}")
return False
def _setup_file_monitoring(self):
"""Setup file monitoring for sysroot changes"""
try:
# Monitor sysroot directory
sysroot_file = Gio.File.new_for_path(self.path)
self.monitor = sysroot_file.monitor_directory(
Gio.FileMonitorFlags.NONE, None
)
self.sig_changed = self.monitor.connect("changed", self._on_sysroot_changed)
self.logger.info("File monitoring setup for sysroot")
except Exception as e:
self.logger.warning(f"Failed to setup file monitoring: {e}")
def _on_sysroot_changed(self, monitor, file, other_file, event_type):
"""Handle sysroot file changes"""
try:
self.logger.debug(f"Sysroot changed: {file.get_path()} - {event_type}")
# Reload deployments if needed
if event_type == Gio.FileMonitorEvent.CHANGES_DONE_HINT:
self._load_deployments()
except Exception as e:
self.logger.warning(f"Error handling sysroot change: {e}")
def _load_deployments(self):
"""Load deployments from sysroot"""
try:
if not self.ot_sysroot:
return
# Get deployments
deployments = self.ot_sysroot.get_deployments()
self.logger.info(f"Loaded {len(deployments)} deployments")
# Process deployments
for deployment in deployments:
self._process_deployment(deployment)
except Exception as e:
self.logger.error(f"Failed to load deployments: {e}")
def _process_deployment(self, deployment: OSTree.Deployment):
"""Process a deployment"""
try:
# Get deployment information
checksum = deployment.get_csum()
origin = deployment.get_origin()
booted = deployment.get_booted()
self.logger.debug(f"Deployment: {checksum} (booted: {booted})")
# Create OS interface if needed
if origin:
os_name = origin.get_string("origin", "refspec")
if os_name and os_name not in self.os_interfaces:
self._create_os_interface(os_name)
except Exception as e:
self.logger.warning(f"Failed to process deployment: {e}")
def _create_os_interface(self, os_name: str):
"""Create OS interface for given OS name"""
try:
# This would create a D-Bus interface for the OS
# Implementation depends on D-Bus interface structure
self.os_interfaces[os_name] = {
'name': os_name,
'deployments': [],
'cached_update': None
}
self.logger.info(f"Created OS interface for: {os_name}")
except Exception as e:
self.logger.error(f"Failed to create OS interface for {os_name}: {e}")
def lock(self) -> bool:
"""Lock the sysroot for exclusive access"""
if self.locked:
return True
try:
# In a real implementation, this would use OSTree's locking mechanism
self.locked = True
self.lock_thread = threading.current_thread()
self.logger.info("Sysroot locked")
return True
except Exception as e:
self.logger.error(f"Failed to lock sysroot: {e}")
return False
def unlock(self):
"""Unlock the sysroot"""
if not self.locked:
return
try:
self.locked = False
self.lock_thread = None
self.logger.info("Sysroot unlocked")
except Exception as e:
self.logger.error(f"Failed to unlock sysroot: {e}")
def clone(self) -> 'AptOstreeSysroot':
"""Create a clone of the sysroot for transaction use"""
try:
# Create new instance with same configuration
clone = AptOstreeSysroot(self.config, self.logger)
# Initialize without file monitoring
clone.ot_sysroot = OSTree.Sysroot.new(Gio.File.new_for_path(self.path))
clone.ot_sysroot.load(None)
clone.repo = OSTree.Repo.new(Gio.File.new_for_path(self.repo_path))
clone.repo.open(None)
self.logger.info("Sysroot cloned for transaction")
return clone
except Exception as e:
self.logger.error(f"Failed to clone sysroot: {e}")
raise
def get_deployments(self) -> Dict[str, Any]:
"""Get deployments as a dictionary for D-Bus compatibility"""
try:
if not self.ot_sysroot:
# Return empty dictionary for test mode
return {"status": "no_deployments", "count": 0}
deployments = self.ot_sysroot.get_deployments()
deployment_dict = {}
for i, deployment in enumerate(deployments):
deployment_info = {
'checksum': deployment.get_csum(),
'booted': deployment.get_booted(),
'pinned': deployment.get_pinned(),
'origin': {}
}
# Get origin information
origin = deployment.get_origin()
if origin:
deployment_info['origin'] = {
'refspec': origin.get_string("origin", "refspec"),
'description': origin.get_string("origin", "description")
}
deployment_dict[f"deployment_{i}"] = deployment_info
# Ensure we always return a dictionary with at least one entry
if not deployment_dict:
deployment_dict = {"status": "no_deployments", "count": 0}
return deployment_dict
except Exception as e:
self.logger.error(f"Failed to get deployments: {e}")
return {"status": "error", "error": str(e), "count": 0}
def get_booted_deployment(self) -> Optional[Dict[str, Any]]:
"""Get currently booted deployment"""
try:
deployments = self.get_deployments()
# Check if we're in test mode or have no deployments
if "status" in deployments:
return None
# Look for booted deployment in the dictionary
for deployment_key, deployment_info in deployments.items():
if deployment_info.get('booted', False):
return deployment_info
return None
except Exception as e:
self.logger.error(f"Failed to get booted deployment: {e}")
return None
def create_deployment(self, commit: str, origin_refspec: str = None) -> Optional[str]:
"""Create a new deployment"""
try:
if not self.ot_sysroot or not self.repo:
raise Exception("Sysroot or repository not initialized")
# Create deployment
deployment = self.ot_sysroot.deploy_tree(
origin_refspec or "debian/apt-ostree",
commit,
None, # origin
None, # override_origin
None, # flags
None # cancellable
)
self.logger.info(f"Created deployment: {deployment.get_csum()}")
return deployment.get_csum()
except Exception as e:
self.logger.error(f"Failed to create deployment: {e}")
return None
def set_default_deployment(self, checksum: str) -> bool:
"""Set default deployment"""
try:
if not self.ot_sysroot:
return False
# Find deployment by checksum
deployments = self.ot_sysroot.get_deployments()
for deployment in deployments:
if deployment.get_csum() == checksum:
# Set as default
self.ot_sysroot.set_default_deployment(deployment)
self.logger.info(f"Set default deployment: {checksum}")
return True
self.logger.error(f"Deployment not found: {checksum}")
return False
except Exception as e:
self.logger.error(f"Failed to set default deployment: {e}")
return False
def cleanup_deployments(self, keep_count: int = 2) -> int:
"""Clean up old deployments"""
try:
if not self.ot_sysroot:
return 0
deployments = self.ot_sysroot.get_deployments()
# Keep booted and pinned deployments
to_keep = []
to_remove = []
for deployment in deployments:
if deployment.get_booted() or deployment.get_pinned():
to_keep.append(deployment)
else:
to_remove.append(deployment)
# Keep the most recent non-booted deployments
to_remove.sort(key=lambda d: d.get_csum(), reverse=True)
to_keep.extend(to_remove[:keep_count])
# Remove old deployments
removed_count = 0
for deployment in to_remove[keep_count:]:
try:
self.ot_sysroot.delete_deployment(deployment, None)
removed_count += 1
except Exception as e:
self.logger.warning(f"Failed to remove deployment {deployment.get_csum()}: {e}")
self.logger.info(f"Cleaned up {removed_count} deployments")
return removed_count
except Exception as e:
self.logger.error(f"Failed to cleanup deployments: {e}")
return 0
def shutdown(self):
"""Shutdown the sysroot"""
try:
self.logger.info("Shutting down sysroot")
# Unlock if locked
if self.locked:
self.unlock()
# Cleanup file monitoring
if self.monitor and self.sig_changed:
self.monitor.disconnect(self.sig_changed)
self.monitor = None
# Close repository
if self.repo:
self.repo = None
# Close sysroot
if self.ot_sysroot:
self.ot_sysroot = None
self.logger.info("Sysroot shutdown complete")
except Exception as e:
self.logger.error(f"Error during sysroot shutdown: {e}")
def get_status(self) -> Dict[str, Any]:
"""Get sysroot status"""
return {
'path': self.path,
'repo_path': self.repo_path,
'locked': self.locked,
'deployments_count': len(self.get_deployments()),
'booted_deployment': self.get_booted_deployment(),
'os_interfaces_count': len(self.os_interfaces)
}