fixup issues when switching between system and user mode

This commit is contained in:
GloriousEggroll 2025-04-13 19:29:53 -06:00
parent 6de6ea1bea
commit d2673b686e
2 changed files with 113 additions and 49 deletions

View file

@ -16,11 +16,16 @@ from pathlib import Path
from html.parser import HTMLParser
import requests
import os
import pwd
from datetime import datetime
class MainWindow(Gtk.Window):
def __init__(self):
def __init__(self, system_mode=False):
super().__init__(title="Flatpost")
self.system_mode = system_mode
self.system_switch = Gtk.Switch()
if self.system_mode:
self.system_switch.set_active(True)
# Step 1: Verify file exists and is accessible
icon_path = "/usr/share/icons/hicolor/1024x1024/apps/com.flatpost.flatpostapp.png"
@ -57,7 +62,6 @@ class MainWindow(Gtk.Window):
self.collection_results = [] # Initialize empty list
self.installed_results = [] # Initialize empty list
self.updates_results = [] # Initialize empty list
self.system_mode = False
self.current_page = None # Track current page
self.current_group = None # Track current group (system/collections/categories)
@ -599,7 +603,6 @@ class MainWindow(Gtk.Window):
system_box.set_margin_end(0)
system_box.set_halign(Gtk.Align.CENTER)
self.system_switch = Gtk.Switch()
self.system_switch.props.valign = Gtk.Align.CENTER
self.system_switch.connect("notify::active", self.on_system_mode_toggled)
self.system_switch.set_hexpand(False)
@ -706,18 +709,67 @@ class MainWindow(Gtk.Window):
self.current_component_type = None
self.refresh_current_page()
def relaunch_as_user(self):
uid = int(os.environ.get('ORIG_USER', ''))
try:
pw_record = pwd.getpwuid(uid)
username = pw_record.pw_name
user_home = pw_record.pw_dir
gid = pw_record.pw_gid
# Drop privileges before exec
os.setgid(gid)
os.setuid(uid)
# Update environment
os.environ["HOME"] = user_home
os.environ["LOGNAME"] = username
os.environ["USER"] = username
os.environ["XDG_RUNTIME_DIR"] = f"/run/user/{uid}"
# Re-exec the script
script_path = Path(__file__).resolve()
os.execvp(
sys.executable,
[sys.executable, str(script_path)]
)
except Exception as e:
print(f"Failed to drop privileges and exec: {e}")
sys.exit(1)
def on_system_mode_toggled(self, switch, gparam):
"""Handle system mode toggle switch state changes"""
desired_state = switch.get_active()
if desired_state:
# Request superuser validation
# Get current script path
current_script = sys.argv[0]
# Re-execute as root with system mode enabled
try:
#subprocess.run(['pkexec', 'true'], check=True)
self.system_mode = True
self.refresh_data()
self.refresh_current_page()
# Construct command to re-execute with system mode enabled
script_path = Path(__file__).resolve()
os.execvp(
"pkexec",
[
"pkexec",
"--disable-internal-agent",
"env",
f"DISPLAY={os.environ['DISPLAY']}",
f"XAUTHORITY={os.environ.get('XAUTHORITY', '')}",
f"XDG_CURRENT_DESKTOP={os.environ.get('XDG_CURRENT_DESKTOP', '').lower()}",
f"ORIG_USER={os.getuid()!s}",
f"PKEXEC_UID={os.getuid()!s}",
"G_MESSAGES_DEBUG=none",
sys.executable,
str(script_path),
'--system-mode',
]
)
except subprocess.CalledProcessError:
# Authentication failed, reset switch and show error
switch.set_active(False)
dialog = Gtk.MessageDialog(
transient_for=self,
@ -729,19 +781,28 @@ class MainWindow(Gtk.Window):
dialog.connect("response", lambda d, r: d.destroy())
dialog.show()
else:
if self.system_mode == True:
self.system_mode = False
self.refresh_data()
self.refresh_current_page()
elif self.system_mode == False:
self.system_mode = True
self.refresh_data()
self.refresh_current_page()
try:
# Construct command to re-execute with system mode enabled
self.relaunch_as_user()
sys.exit(0)
except subprocess.CalledProcessError:
# Authentication failed, reset switch and show error
switch.set_active(True)
dialog = Gtk.MessageDialog(
transient_for=self,
message_type=Gtk.MessageType.ERROR,
buttons=Gtk.ButtonsType.OK,
text="Authentication failed",
secondary_text="Could not enable user mode"
)
dialog.connect("response", lambda d, r: d.destroy())
dialog.show()
def populate_repo_dropdown(self):
# Get list of repositories
fp_turbo.repolist(self.system_mode)
repos = fp_turbo.repolist()
repos = fp_turbo.repolist(self.system_mode)
# Clear existing items
self.repo_dropdown.remove_all()
@ -2768,7 +2829,6 @@ class MainWindow(Gtk.Window):
indicator = Gtk.Label(label="* = global override", xalign=1.0)
indicator.get_style_context().add_class("permissions-global-indicator")
# Add other sections with correct permission types
self._add_section(app_id, listbox, "Shared", "shared", [
("Network", "network", "Can communicate over network"),
@ -2903,7 +2963,6 @@ class MainWindow(Gtk.Window):
success, perms = fp_turbo.list_other_perm_toggles(app_id, perm_type, self.system_mode)
if not success:
perms = {"paths": []}
if section_options:
# Add options
for display_text, option, description in section_options:
@ -3701,7 +3760,7 @@ class MainWindow(Gtk.Window):
self.system_mode
)
elif perm_type == "filesystems":
success, message = fp_turbo.remove_file_permissions(
success, message = fp_turbo.global_remove_file_permissions(
path,
"filesystems",
True,
@ -4431,12 +4490,20 @@ class MainWindow(Gtk.Window):
self.on_category_clicked('trending', 'collections')
def main():
# Initialize GTK before anything else
if not Gtk.init_check():
print("Failed to initialize GTK")
return 1
system_mode = False
# Check for command line argument
if len(sys.argv) > 1:
arg = sys.argv[1]
if arg == '--system-mode':
system_mode = True
if arg.endswith('.flatpakref'):
# Create a temporary window just to handle the installation
app = MainWindow()
app = MainWindow(system_mode=system_mode)
app.handle_flatpakref_file(arg)
# Keep the window open for 5 seconds to show the result
GLib.timeout_add_seconds(5, Gtk.main_quit)
@ -4444,13 +4511,13 @@ def main():
return
if arg.endswith('.flatpakrepo'):
# Create a temporary window just to handle the installation
app = MainWindow()
app = MainWindow(system_mode=system_mode)
app.handle_flatpakrepo_file(arg)
# Keep the window open for 5 seconds to show the result
GLib.timeout_add_seconds(5, Gtk.main_quit)
Gtk.main()
return
app = MainWindow()
app = MainWindow(system_mode=system_mode)
app.connect("destroy", Gtk.main_quit)
app.show_all()
Gtk.main()

View file

@ -442,6 +442,7 @@ class AppstreamSearcher:
search_results = []
packages = self.remotes[repo_name]
found = None
for package in packages:
# Try matching exact ID first
if keyword is package.id:
@ -783,7 +784,7 @@ class AppstreamSearcher:
for group_name, categories in self.category_groups.items():
for category, title in categories.items():
if category not in self.category_groups['system']:
self._process_category(searcher, category, current_category, total_categories)
self._process_category(searcher, category, current_category, total_categories, system)
else:
self._process_system_category(searcher, category, system)
current_category += 1
@ -791,11 +792,11 @@ class AppstreamSearcher:
return self._get_current_results()
def _process_category(self, searcher, category, current_category, total_categories):
def _process_category(self, searcher, category, current_category, total_categories, system=False):
"""Process a single category and retrieve its metadata."""
if self._should_refresh():
self._refresh_category_data(searcher, category)
self._refresh_category_data(searcher, category, system)
app_data_dir = Path.home() / ".local" / "share" / "flatpost"
app_data_dir.mkdir(parents=True, exist_ok=True)
@ -805,7 +806,7 @@ class AppstreamSearcher:
collections_data = json.load(f)
self._update_from_collections(collections_data, category)
except (IOError, json.JSONDecodeError) as e:
logger.error(f"Error loading collections data: {str(e)}")
pass
self.refresh_progress = (current_category / total_categories) * 100
@ -1190,14 +1191,21 @@ def download_repo(url):
def get_metadata_path(app_id: str | None, override=False, system=False) -> str:
metadata_path = ""
if override:
if app_id:
# Get the application's metadata file
installation = get_installation(system)
app_path = installation.get_current_installed_app(app_id).get_deploy_dir()
if not app_path:
print(f"Application {app_id} not found")
return metadata_path
metadata_path = app_path + "/metadata"
elif override:
if system:
metadata_path = "/var/lib/flatpak/overrides/global"
if not os.path.exists(metadata_path):
os.makedirs(os.path.dirname(metadata_path), exist_ok=True)
with open(metadata_path, 'w') as f:
pass
else:
home_dir = os.path.expanduser("~")
metadata_path = f"{home_dir}/.local/share/flatpak/overrides/global"
@ -1205,16 +1213,6 @@ def get_metadata_path(app_id: str | None, override=False, system=False) -> str:
os.makedirs(os.path.dirname(metadata_path), exist_ok=True)
with open(metadata_path, 'w') as f:
pass
elif app_id:
# Get the application's metadata file
installation = get_installation(system)
app_path = installation.get_current_installed_app(app_id).get_deploy_dir()
if not app_path:
print(f"Application {app_id} not found")
return metadata_path
metadata_path = app_path + "/metadata"
if not os.path.exists(metadata_path):
print(f"Metadata file not found for {app_id}")
return metadata_path
@ -1249,9 +1247,8 @@ def add_file_permissions(app_id: str, path: str, perm_type=None, system=False) -
Returns:
tuple[bool, str]: (success, message)
"""
try:
key_file = get_perm_key_file(app_id, system)
key_file = get_perm_key_file(app_id, False, system)
perm_type = perm_type or "filesystems"
# Handle special case for home directory
if path.lower() == "host":
@ -1320,7 +1317,7 @@ def remove_file_permissions(app_id: str, path: str, perm_type=None, system=False
tuple[bool, str]: (success, message)
"""
try:
key_file = get_perm_key_file(app_id, system)
key_file = get_perm_key_file(app_id, False, system)
perm_type = perm_type or "filesystems"
# Handle special case for home directory
@ -1389,7 +1386,7 @@ def list_file_perms(app_id: str, system=False) -> tuple[bool, dict[str, list[str
- 'special_paths': list of special paths (home, host, etc.)
"""
try:
key_file = get_perm_key_file(app_id, system)
key_file = get_perm_key_file(app_id, False, system)
# Initialize result dictionary
result = {
@ -1430,7 +1427,7 @@ def list_other_perm_toggles(app_id: str, perm_type: str, system=False) -> tuple[
- 'paths': list of filesystem paths
"""
try:
key_file = get_perm_key_file(app_id, system)
key_file = get_perm_key_file(app_id, False, system)
# Initialize result dictionary
result = {
@ -1475,7 +1472,7 @@ def toggle_other_perms(app_id: str, perm_type: str, option: str, enable: bool, s
bool: True if successful, False if operation failed
"""
# Get the KeyFile object
key_file = get_perm_key_file(app_id, system)
key_file = get_perm_key_file(app_id, False, system)
if not key_file:
return False, f"Failed to get permissions for {app_id}"
@ -1541,7 +1538,7 @@ def list_other_perm_values(app_id: str, perm_type: str, system=False) -> tuple[b
- 'paths': list of environment variables
"""
try:
key_file = get_perm_key_file(app_id, system)
key_file = get_perm_key_file(app_id, False, system)
# Initialize result dictionary
result = {
@ -1594,7 +1591,7 @@ def add_permission_value(app_id: str, perm_type: str, value: str, system=False)
tuple[bool, str]: (success, message)
"""
try:
key_file = get_perm_key_file(app_id, system)
key_file = get_perm_key_file(app_id, False, system)
# Convert perm_type to the correct format
match perm_type.lower():
@ -1642,7 +1639,7 @@ def remove_permission_value(app_id: str, perm_type: str, value: str, system=Fals
tuple[bool, str]: (success, message)
"""
try:
key_file = get_perm_key_file(app_id, system)
key_file = get_perm_key_file(app_id, False, system)
# Convert perm_type to the correct format
match perm_type.lower():