particle-os-tools/src/apt-ostree.py/python/apt_ostree_dbus/interface_simple.py
Joe Particle 130b406ee2 Fix async bug in D-Bus interface and update documentation
- Fixed critical 'asyncio.run() cannot be called from a running event loop' error
- Converted all D-Bus methods to async def with proper await usage
- Fixed dbus-next integration for full async functionality
- Daemon now properly handles concurrent async operations
- Updated TODO.md and CHANGELOG.md to reflect async bug fix
- Added interface_simple.py with dbus-next implementation
- Updated D-Bus policy for development use
- Added test script for D-Bus properties verification
2025-07-16 06:05:30 +00:00

303 lines
No EOL
10 KiB
Python

#!/usr/bin/env python3
"""
Simplified D-Bus interface for apt-ostree using dbus-next
Following rpm-ostree architectural patterns
"""
import asyncio
import json
import logging
from typing import Dict, Any, List, Optional
from dbus_next import BusType, DBusError
from dbus_next.aio import MessageBus
from dbus_next.service import ServiceInterface, method, signal
from dbus_next.signature import Variant
from core.sysroot import AptOstreeSysroot
from utils.shell_integration import ShellIntegration
class AptOstreeSysrootInterface(ServiceInterface):
"""Sysroot interface following rpm-ostree patterns"""
def __init__(self, daemon_instance):
super().__init__("org.debian.aptostree1.Sysroot")
self.daemon = daemon_instance
self.shell_integration = ShellIntegration()
self.logger = logging.getLogger('dbus.sysroot')
# Properties following rpm-ostree pattern
self._booted = "/"
self._path = "/"
self._active_transaction = None
self._automatic_update_policy = "none"
self._deployments = {}
@method()
def GetProperty(self, property_name: 's') -> 's':
"""Get property value as JSON string"""
try:
if property_name == "Booted":
return json.dumps(self._booted)
elif property_name == "Path":
return json.dumps(self._path)
elif property_name == "ActiveTransaction":
return json.dumps(self._active_transaction or "")
elif property_name == "AutomaticUpdatePolicy":
return json.dumps(self._automatic_update_policy)
elif property_name == "Deployments":
return json.dumps(self._deployments)
else:
return json.dumps({'error': f'Unknown property: {property_name}'})
except Exception as e:
return json.dumps({'error': str(e)})
@method()
def SetProperty(self, property_name: 's', value: 's') -> 's':
"""Set property value"""
try:
if property_name == "AutomaticUpdatePolicy":
if value in ["none", "check", "stage"]:
self._automatic_update_policy = value
self.logger.info(f"Update policy set to: {value}")
return json.dumps({'success': True})
else:
return json.dumps({'error': f'Invalid update policy: {value}'})
else:
return json.dumps({'error': f'Property {property_name} is read-only'})
except Exception as e:
return json.dumps({'error': str(e)})
@method()
async def InstallPackages(self, packages: 'as', live_install: 'b' = False) -> 's':
"""Install packages using apt-layer.sh"""
try:
self.logger.info(f"Installing packages: {packages}")
# Start transaction
transaction_id = f"install_{int(asyncio.get_event_loop().time())}"
self._active_transaction = transaction_id
# Execute installation
result = await self.shell_integration.install_packages(packages, live_install)
# Add transaction info
result['transaction_id'] = transaction_id
# Clear transaction
self._active_transaction = None
return json.dumps(result)
except Exception as e:
self._active_transaction = None
self.logger.error(f"InstallPackages failed: {e}")
return json.dumps({'success': False, 'error': str(e)})
@method()
async def RemovePackages(self, packages: 'as', live_remove: 'b' = False) -> 's':
"""Remove packages using apt-layer.sh"""
try:
self.logger.info(f"Removing packages: {packages}")
# Start transaction
transaction_id = f"remove_{int(asyncio.get_event_loop().time())}"
self._active_transaction = transaction_id
# Execute removal
result = await self.shell_integration.remove_packages(packages, live_remove)
# Add transaction info
result['transaction_id'] = transaction_id
# Clear transaction
self._active_transaction = None
return json.dumps(result)
except Exception as e:
self._active_transaction = None
self.logger.error(f"RemovePackages failed: {e}")
return json.dumps({'success': False, 'error': str(e)})
@method()
async def CreateComposeFSLayer(self, source_dir: 's', layer_path: 's', digest_store: 's') -> 's':
"""Create ComposeFS layer"""
try:
self.logger.info(f"Creating ComposeFS layer: {source_dir} -> {layer_path}")
# Start transaction
transaction_id = f"composefs_{int(asyncio.get_event_loop().time())}"
self._active_transaction = transaction_id
# Execute ComposeFS creation
result = await self.shell_integration.create_composefs_layer(source_dir, layer_path, digest_store)
# Add transaction info
result['transaction_id'] = transaction_id
# Clear transaction
self._active_transaction = None
return json.dumps(result)
except Exception as e:
self._active_transaction = None
self.logger.error(f"CreateComposeFSLayer failed: {e}")
return json.dumps({'success': False, 'error': str(e)})
@signal()
def TransactionStarted(self, transaction_id: 's', operation: 's') -> None:
"""Signal emitted when transaction starts"""
pass
@signal()
def TransactionFinished(self, transaction_id: 's', success: 'b', error: 's') -> None:
"""Signal emitted when transaction finishes"""
pass
class AptOstreeOSInterface(ServiceInterface):
"""OS interface following rpm-ostree patterns"""
def __init__(self, daemon_instance):
super().__init__("org.debian.aptostree1.OS")
self.daemon = daemon_instance
self.shell_integration = ShellIntegration()
self.logger = logging.getLogger('dbus.os')
@method()
async def Deploy(self, revision: 's') -> 's':
"""Deploy specific revision"""
try:
self.logger.info(f"Deploying revision: {revision}")
# Start transaction
transaction_id = f"deploy_{int(asyncio.get_event_loop().time())}"
# Execute deployment
result = await self.shell_integration.execute_apt_layer_command(
"deploy", [revision]
)
# Add transaction info
result['transaction_id'] = transaction_id
return json.dumps(result)
except Exception as e:
self.logger.error(f"Deploy failed: {e}")
return json.dumps({'success': False, 'error': str(e)})
@method()
async def Upgrade(self) -> 's':
"""Upgrade to latest version"""
try:
self.logger.info("Starting system upgrade")
# Start transaction
transaction_id = f"upgrade_{int(asyncio.get_event_loop().time())}"
# Execute upgrade
result = await self.shell_integration.execute_apt_layer_command(
"upgrade", []
)
# Add transaction info
result['transaction_id'] = transaction_id
return json.dumps(result)
except Exception as e:
self.logger.error(f"Upgrade failed: {e}")
return json.dumps({'success': False, 'error': str(e)})
@method()
async def Rollback(self) -> 's':
"""Rollback to previous deployment"""
try:
self.logger.info("Starting system rollback")
# Start transaction
transaction_id = f"rollback_{int(asyncio.get_event_loop().time())}"
# Execute rollback
result = await self.shell_integration.execute_apt_layer_command(
"rollback", []
)
# Add transaction info
result['transaction_id'] = transaction_id
return json.dumps(result)
except Exception as e:
self.logger.error(f"Rollback failed: {e}")
return json.dumps({'success': False, 'error': str(e)})
class SimpleAptOstreeDaemon:
"""Simplified apt-ostree daemon using dbus-next"""
def __init__(self):
self.logger = logging.getLogger('daemon')
self.bus = None
self.sysroot_interface = None
self.os_interface = None
self.running = False
# Initialize components
self.shell_integration = ShellIntegration()
async def start(self):
"""Start the daemon"""
try:
self.logger.info("Starting apt-ostree daemon with dbus-next")
# Connect to system bus
self.bus = await MessageBus(bus_type=BusType.SYSTEM).connect()
# Create interfaces
self.sysroot_interface = AptOstreeSysrootInterface(self)
self.os_interface = AptOstreeOSInterface(self)
# Export interfaces
self.bus.export('/org/debian/aptostree1/Sysroot', self.sysroot_interface)
self.bus.export('/org/debian/aptostree1/OS', self.os_interface)
# Request bus name
await self.bus.request_name('org.debian.aptostree1')
self.running = True
self.logger.info("Daemon started successfully")
# Keep running
while self.running:
await asyncio.sleep(1)
except Exception as e:
self.logger.error(f"Failed to start daemon: {e}")
raise
async def stop(self):
"""Stop the daemon"""
self.logger.info("Stopping daemon")
self.running = False
if self.bus:
await self.bus.disconnect()
self.logger.info("Daemon stopped")
async def main():
"""Main entry point"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
daemon = SimpleAptOstreeDaemon()
try:
await daemon.start()
except KeyboardInterrupt:
await daemon.stop()
if __name__ == "__main__":
asyncio.run(main())