particle-os-tools/src/apt-ostree.py/python/utils/shell_integration.py
Joe Particle 5c7a697ea4
Some checks failed
Compile apt-layer (v2) / compile (push) Failing after 3h9m6s
Phase 3: Testing & Cleanup - Complete D-Bus Integration Testing
- D-Bus methods, properties, and signals all working correctly
- Shell integration tests pass 16/19 tests
- Core daemon fully decoupled from D-Bus dependencies
- Clean architecture with thin D-Bus wrappers established
- Signal emission using correct dbus-next pattern
- Updated test scripts for apt-ostreed service name
- Fixed dbus-next signal definitions and emission patterns
- Updated TODO and CHANGELOG for Phase 3 completion
2025-07-17 04:32:52 +00:00

383 lines
No EOL
15 KiB
Python

#!/usr/bin/env python3
"""
Shell integration utilities with progress callback support
"""
import asyncio
import json
import logging
import subprocess
from typing import Dict, List, Any, Optional, Callable
class ShellIntegration:
"""Shell integration with progress callback support"""
def __init__(self, progress_callback: Optional[Callable[[float, str], None]] = None):
self.logger = logging.getLogger('shell.integration')
self.progress_callback = progress_callback
def _report_progress(self, progress: float, message: str):
"""Report progress via callback if available"""
if self.progress_callback:
try:
self.progress_callback(progress, message)
except Exception as e:
self.logger.error(f"Progress callback failed: {e}")
async def install_packages(
self,
packages: List[str],
live_install: bool = False,
progress_callback: Optional[Callable[[float, str], None]] = None
) -> Dict[str, Any]:
"""Install packages with progress reporting"""
# Use provided callback or fall back to instance callback
callback = progress_callback or self.progress_callback
try:
self._report_progress(0.0, f"Preparing to install {len(packages)} packages")
# Build command
cmd = ["/home/joe/particle-os-tools/apt-layer.sh", "layer", "install"] + packages
self._report_progress(10.0, "Executing apt-layer.sh install command")
# Execute command
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
self._report_progress(20.0, "Command executing, monitoring output")
# Monitor output and report progress
stdout, stderr = await process.communicate()
self._report_progress(90.0, "Processing command results")
# Parse results
result = {
'success': process.returncode == 0,
'stdout': stdout.decode('utf-8', errors='replace'),
'stderr': stderr.decode('utf-8', errors='replace'),
'error': None,
'exit_code': process.returncode,
'command': ' '.join(cmd),
'installed_packages': packages if process.returncode == 0 else [],
'warnings': [],
'errors': [],
'details': {
'packages_installed': len(packages) if process.returncode == 0 else 0,
'warnings_count': 0,
'errors_count': 0
}
}
if process.returncode != 0:
result['error'] = f"Command failed with exit code {process.returncode}"
result['message'] = f"Installation failed: {result['error']}"
else:
result['message'] = f"Successfully installed {len(packages)} packages"
self._report_progress(100.0, result['message'])
return result
except Exception as e:
error_msg = f"Installation failed: {str(e)}"
self._report_progress(0.0, error_msg)
return {
'success': False,
'error': str(e),
'message': error_msg,
'stdout': '',
'stderr': '',
'exit_code': -1,
'command': ' '.join(cmd) if 'cmd' in locals() else 'unknown',
'installed_packages': [],
'warnings': [],
'errors': [str(e)],
'details': {
'packages_installed': 0,
'warnings_count': 0,
'errors_count': 1
}
}
async def remove_packages(
self,
packages: List[str],
live_remove: bool = False,
progress_callback: Optional[Callable[[float, str], None]] = None
) -> Dict[str, Any]:
"""Remove packages with progress reporting"""
callback = progress_callback or self.progress_callback
try:
self._report_progress(0.0, f"Preparing to remove {len(packages)} packages")
# Build command
cmd = ["/home/joe/particle-os-tools/apt-layer.sh", "layer", "remove"] + packages
self._report_progress(10.0, "Executing apt-layer.sh remove command")
# Execute command
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
self._report_progress(20.0, "Command executing, monitoring output")
stdout, stderr = await process.communicate()
self._report_progress(90.0, "Processing command results")
result = {
'success': process.returncode == 0,
'stdout': stdout.decode('utf-8', errors='replace'),
'stderr': stderr.decode('utf-8', errors='replace'),
'error': None,
'exit_code': process.returncode,
'command': ' '.join(cmd),
'removed_packages': packages if process.returncode == 0 else [],
'warnings': [],
'errors': [],
'details': {
'packages_removed': len(packages) if process.returncode == 0 else 0,
'warnings_count': 0,
'errors_count': 0
}
}
if process.returncode != 0:
result['error'] = f"Command failed with exit code {process.returncode}"
result['message'] = f"Removal failed: {result['error']}"
else:
result['message'] = f"Successfully removed {len(packages)} packages"
self._report_progress(100.0, result['message'])
return result
except Exception as e:
error_msg = f"Removal failed: {str(e)}"
self._report_progress(0.0, error_msg)
return {
'success': False,
'error': str(e),
'message': error_msg,
'stdout': '',
'stderr': '',
'exit_code': -1,
'command': ' '.join(cmd) if 'cmd' in locals() else 'unknown',
'removed_packages': [],
'warnings': [],
'errors': [str(e)],
'details': {
'packages_removed': 0,
'warnings_count': 0,
'errors_count': 1
}
}
async def deploy_layer(
self,
deployment_id: str,
progress_callback: Optional[Callable[[float, str], None]] = None
) -> Dict[str, Any]:
"""Deploy a specific layer with progress reporting"""
callback = progress_callback or self.progress_callback
try:
self._report_progress(0.0, f"Preparing to deploy {deployment_id}")
# Build command
cmd = ["/home/joe/particle-os-tools/apt-layer.sh", "deploy", deployment_id]
self._report_progress(10.0, "Executing apt-layer.sh deploy command")
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
self._report_progress(20.0, "Command executing, monitoring output")
stdout, stderr = await process.communicate()
self._report_progress(90.0, "Processing command results")
result = {
'success': process.returncode == 0,
'stdout': stdout.decode('utf-8', errors='replace'),
'stderr': stderr.decode('utf-8', errors='replace'),
'error': None,
'exit_code': process.returncode,
'command': ' '.join(cmd),
'deployment_id': deployment_id,
'message': f"Deployment {'completed' if process.returncode == 0 else 'failed'}"
}
if process.returncode != 0:
result['error'] = f"Command failed with exit code {process.returncode}"
result['message'] = f"Deployment failed: {result['error']}"
self._report_progress(100.0, result['message'])
return result
except Exception as e:
error_msg = f"Deployment failed: {str(e)}"
self._report_progress(0.0, error_msg)
return {
'success': False,
'error': str(e),
'message': error_msg,
'stdout': '',
'stderr': '',
'exit_code': -1,
'command': ' '.join(cmd) if 'cmd' in locals() else 'unknown',
'deployment_id': deployment_id
}
async def upgrade_system(
self,
progress_callback: Optional[Callable[[float, str], None]] = None
) -> Dict[str, Any]:
"""Upgrade the system with progress reporting"""
callback = progress_callback or self.progress_callback
try:
self._report_progress(0.0, "Preparing system upgrade")
# Build command
cmd = ["/home/joe/particle-os-tools/apt-layer.sh", "upgrade"]
self._report_progress(10.0, "Executing apt-layer.sh upgrade command")
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
self._report_progress(20.0, "Command executing, monitoring output")
stdout, stderr = await process.communicate()
self._report_progress(90.0, "Processing command results")
result = {
'success': process.returncode == 0,
'stdout': stdout.decode('utf-8', errors='replace'),
'stderr': stderr.decode('utf-8', errors='replace'),
'error': None,
'exit_code': process.returncode,
'command': ' '.join(cmd),
'message': f"System upgrade {'completed' if process.returncode == 0 else 'failed'}"
}
if process.returncode != 0:
result['error'] = f"Command failed with exit code {process.returncode}"
result['message'] = f"Upgrade failed: {result['error']}"
self._report_progress(100.0, result['message'])
return result
except Exception as e:
error_msg = f"Upgrade failed: {str(e)}"
self._report_progress(0.0, error_msg)
return {
'success': False,
'error': str(e),
'message': error_msg,
'stdout': '',
'stderr': '',
'exit_code': -1,
'command': ' '.join(cmd) if 'cmd' in locals() else 'unknown'
}
async def rollback_system(
self,
progress_callback: Optional[Callable[[float, str], None]] = None
) -> Dict[str, Any]:
"""Rollback the system with progress reporting"""
callback = progress_callback or self.progress_callback
try:
self._report_progress(0.0, "Preparing system rollback")
# Build command
cmd = ["/home/joe/particle-os-tools/apt-layer.sh", "rollback"]
self._report_progress(10.0, "Executing apt-layer.sh rollback command")
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
self._report_progress(20.0, "Command executing, monitoring output")
stdout, stderr = await process.communicate()
self._report_progress(90.0, "Processing command results")
result = {
'success': process.returncode == 0,
'stdout': stdout.decode('utf-8', errors='replace'),
'stderr': stderr.decode('utf-8', errors='replace'),
'error': None,
'exit_code': process.returncode,
'command': ' '.join(cmd),
'message': f"System rollback {'completed' if process.returncode == 0 else 'failed'}"
}
if process.returncode != 0:
result['error'] = f"Command failed with exit code {process.returncode}"
result['message'] = f"Rollback failed: {result['error']}"
self._report_progress(100.0, result['message'])
return result
except Exception as e:
error_msg = f"Rollback failed: {str(e)}"
self._report_progress(0.0, error_msg)
return {
'success': False,
'error': str(e),
'message': error_msg,
'stdout': '',
'stderr': '',
'exit_code': -1,
'command': ' '.join(cmd) if 'cmd' in locals() else 'unknown'
}
# Legacy synchronous methods for backward compatibility
def install_packages_sync(self, packages: List[str], live_install: bool = False) -> Dict[str, Any]:
"""Synchronous version for backward compatibility"""
return asyncio.run(self.install_packages(packages, live_install))
def remove_packages_sync(self, packages: List[str], live_remove: bool = False) -> Dict[str, Any]:
"""Synchronous version for backward compatibility"""
return asyncio.run(self.remove_packages(packages, live_remove))
def deploy_layer_sync(self, deployment_id: str) -> Dict[str, Any]:
"""Synchronous version for backward compatibility"""
return asyncio.run(self.deploy_layer(deployment_id))
def get_system_status_sync(self) -> Dict[str, Any]:
"""Synchronous version for backward compatibility"""
return asyncio.run(self.upgrade_system()) # This is a placeholder - should be a real status method
def rollback_layer_sync(self) -> Dict[str, Any]:
"""Synchronous version for backward compatibility"""
return asyncio.run(self.rollback_system())