cleanup system/user modes, finish install/remove, add second local refresh for retrieving info after install/remove without refreshing entire data set

This commit is contained in:
GloriousEggroll 2025-03-24 23:29:14 -06:00
parent 5c1efedc7c
commit d6d27c54b6
2 changed files with 250 additions and 150 deletions

View file

@ -191,9 +191,10 @@ class AppStreamPackage:
class AppstreamSearcher:
"""Flatpak AppStream Package seacher"""
def __init__(self) -> None:
def __init__(self, refresh=False) -> None:
self.remotes: dict[str, list[AppStreamPackage]] = {}
self.refresh_progress = 0
self.refresh = refresh
# Define category groups and their titles
self.category_groups = {
@ -239,7 +240,8 @@ class AppstreamSearcher:
packages = []
metadata = AppStream.Metadata.new()
metadata.set_format_style(AppStream.FormatStyle.CATALOG)
inst.update_appstream_full_sync(remote.get_name(), None, None, True)
if self.refresh:
inst.update_appstream_full_sync(remote.get_name(), None, None, True)
appstream_file = Path(remote.get_appstream_dir().get_path() + "/appstream.xml.gz")
if appstream_file.exists():
metadata.parse_file(Gio.File.new_for_path(appstream_file.as_posix()), AppStream.FormatKind.XML)
@ -284,7 +286,7 @@ class AppstreamSearcher:
return search_results
def get_all_apps(self, repo_name=None, user=None) -> list[AppStreamPackage]:
def get_all_apps(self, repo_name=None) -> list[AppStreamPackage]:
"""Get all available apps from specified or all repositories"""
all_packages = []
if repo_name:
@ -295,9 +297,9 @@ class AppstreamSearcher:
all_packages.extend(self.remotes[remote_name])
return all_packages
def get_categories_summary(self, repo_name=None, user=None) -> dict:
def get_categories_summary(self, repo_name=None) -> dict:
"""Get a summary of all apps grouped by category"""
apps = self.get_all_apps(repo_name,user)
apps = self.get_all_apps(repo_name)
categories = {}
for app in apps:
@ -308,32 +310,24 @@ class AppstreamSearcher:
return categories
def get_installed_apps(self, user=None) -> list[tuple[str, str, str]]:
def get_installed_apps(self, system=False) -> list[tuple[str, str, str]]:
"""Get a list of all installed Flatpak applications with their repository source"""
installed_refs = []
if not user:
installation = get_installation()
else:
installation = get_installation("system")
installation = get_installation(system)
def process_installed_refs(inst: Flatpak.Installation, repo_type: str):
def process_installed_refs(inst: Flatpak.Installation, system=False):
for ref in inst.list_installed_refs_by_kind(Flatpak.RefKind.APP):
app_id = ref.format_ref()
# Get remote name from the installation
remote_name = ref.get_origin()
# Handle cases where remote might be None
if not remote_name:
remote_name = repo_type.capitalize()
installed_refs.append((app_id, remote_name, repo_type))
if system is False:
installed_refs.append((app_id, remote_name, "user"))
else:
installed_refs.append((app_id, remote_name, "system"))
# Process both system-wide and user installations
if not user:
process_installed_refs(installation, "user")
else:
process_installed_refs(installation, "system")
process_installed_refs(installation, system)
# Remove duplicates while maintaining order
seen = set()
@ -342,32 +336,24 @@ class AppstreamSearcher:
return unique_installed
def check_updates(self, user=None) -> list[tuple[str, str, str]]:
def check_updates(self, system=False) -> list[tuple[str, str, str]]:
"""Check for available updates for installed Flatpak applications"""
updates = []
if not user:
installation = get_installation()
else:
installation = get_installation("system")
installation = get_installation(system)
def check_updates_for_install(inst: Flatpak.Installation, repo_type: str):
def check_updates_for_install(inst: Flatpak.Installation, system=False):
for ref in inst.list_installed_refs_for_update(None):
app_id = ref.get_name()
# Get remote name from the installation
remote_name = ref.get_origin()
# Handle cases where remote might be None
if not remote_name:
remote_name = repo_type.capitalize()
updates.append((remote_name, app_id, repo_type))
if system is False:
updates.append((app_id, remote_name, "user"))
else:
updates.append((app_id, remote_name, "system"))
# Process both system-wide and user installations
if not user:
check_updates_for_install(installation, "user")
else:
check_updates_for_install(installation, "system")
check_updates_for_install(installation, system)
return updates
@ -442,7 +428,41 @@ class AppstreamSearcher:
self.collection_results = updated_results
def retrieve_metadata(self, user=None):
def refresh_local(self, system=False):
# make sure to reset these to empty before refreshing.
self.installed_results = [] # Initialize empty list
self.updates_results = [] # Initialize empty list
total_categories = sum(len(categories) for categories in self.category_groups.values())
current_category = 0
# Search for each app in local repositories
searcher = get_reposearcher(system)
search_result = []
for group_name, categories in self.category_groups.items():
# Process categories one at a time to keep GUI responsive
for category, title in categories.items():
if "installed" in category:
installed_apps = searcher.get_installed_apps()
for app_id, repo_name, repo_type in installed_apps:
parts = app_id.split('/')
app_id = parts[parts.index('app') + 1]
if repo_name:
search_result = searcher.search_flatpak(app_id, repo_name)
self.installed_results.extend(search_result)
elif "updates" in category:
updates = searcher.check_updates()
for repo_name, app_id, repo_type in updates:
if repo_name:
search_result = searcher.search_flatpak(app_id, repo_name)
self.updates_results.extend(search_result)
# Update progress bar
self.refresh_progress = (current_category / total_categories) * 100
# make sure to reset these to empty before refreshing.
return self.installed_results, self.updates_results
def retrieve_metadata(self, system=False, refresh=True):
# make sure to reset these to empty before refreshing.
self.category_results = [] # Initialize empty list
@ -452,12 +472,8 @@ class AppstreamSearcher:
total_categories = sum(len(categories) for categories in self.category_groups.values())
current_category = 0
# Search for each app in local repositories
if not user:
searcher = get_reposearcher()
else:
searcher = get_reposearcher("system")
searcher = get_reposearcher(system, refresh)
json_path = "collections_data.json"
search_result = []
@ -477,7 +493,6 @@ class AppstreamSearcher:
self.collection_results.extend(search_result)
except (IOError, json.JSONDecodeError) as e:
print(f"Error loading collections data: {str(e)}")
# Try to get apps from Flathub API if internet is available
if check_internet():
# Get modification time in seconds since epoch
@ -507,10 +522,9 @@ class AppstreamSearcher:
# Update progress bar
self.refresh_progress = (current_category / total_categories) * 100
else:
if "installed" in category:
installed_apps = searcher.get_installed_apps(user)
installed_apps = searcher.get_installed_apps()
for app_id, repo_name, repo_type in installed_apps:
parts = app_id.split('/')
app_id = parts[parts.index('app') + 1]
@ -518,7 +532,7 @@ class AppstreamSearcher:
search_result = searcher.search_flatpak(app_id, repo_name)
self.installed_results.extend(search_result)
elif "updates" in category:
updates = searcher.check_updates(user)
updates = searcher.check_updates()
for repo_name, app_id, repo_type in updates:
if repo_name:
search_result = searcher.search_flatpak(app_id, repo_name)
@ -547,7 +561,7 @@ class AppstreamSearcher:
# make sure to reset these to empty before refreshing.
return self.category_results, self.collection_results, self.installed_results, self.updates_results
def install_flatpak(app: AppStreamPackage, repo_name=None, user=None) -> tuple[bool, str]:
def install_flatpak(app: AppStreamPackage, repo_name=None, system=False) -> tuple[bool, str]:
"""
Install a Flatpak package.
@ -562,12 +576,8 @@ def install_flatpak(app: AppStreamPackage, repo_name=None, user=None) -> tuple[b
if not repo_name:
repo_name = "flathub"
if not user:
installation = get_installation()
searcher = get_reposearcher()
else:
installation = get_installation("system")
searcher = get_reposearcher("system")
installation = get_installation(system)
searcher = get_reposearcher(system)
remote = installation.get_remote_by_name(repo_name)
if not remote:
@ -588,7 +598,7 @@ def install_flatpak(app: AppStreamPackage, repo_name=None, user=None) -> tuple[b
except GLib.Error as e:
return False, f"Installation failed: {e}"
def remove_flatpak(app: AppStreamPackage, repo_name=None, user=None) -> tuple[bool, str]:
def remove_flatpak(app: AppStreamPackage, repo_name=None, system=False) -> tuple[bool, str]:
"""
Remove a Flatpak package using transactions.
@ -603,40 +613,46 @@ def remove_flatpak(app: AppStreamPackage, repo_name=None, user=None) -> tuple[bo
repo_name = "flathub"
# Get the appropriate installation based on user parameter
if user is None:
installation = get_installation()
else:
installation = get_installation("system")
installation = get_installation(system)
searcher = get_reposearcher(system)
remote = installation.get_remote_by_name(repo_name)
if not remote:
return False, f"Repository '{repo_name}' not found."
# Create a new transaction for removal
transaction = Flatpak.Transaction.new_for_installation(installation)
details = app.get_details()
transaction.add_uninstall(details['id'])
installed_apps = searcher.get_installed_apps()
for ins_app_id_bundle, ins_repo_name, ins_repo_type in installed_apps:
parts = ins_app_id_bundle.split('/')
ins_app_id = parts[parts.index('app') + 1]
if ins_app_id == app.id:
# Create a new transaction for removal
transaction = Flatpak.Transaction.new_for_installation(installation)
transaction.add_uninstall(ins_app_id_bundle)
# Run the transaction
try:
transaction.run()
return True, f"Successfully removed {app.id}"
except GLib.Error as e:
return False, f"Failed to remove {app.id}: {e}"
return False, f"Application '{app.id}' is not installed."
# Run the transaction
try:
transaction.run()
return True, f"Successfully installed {app.id}"
except GLib.Error as e:
return False, f"Installation failed: {e}"
def get_installation(user=None):
if not user:
def get_installation(system=False):
if system is False:
installation = Flatpak.Installation.new_user()
else:
installation = Flatpak.Installation.new_system()
return installation
def get_reposearcher(user=None):
searcher = AppstreamSearcher()
if not user:
def get_reposearcher(system=False, refresh=False):
if system is False:
installation = get_installation()
else:
installation = get_installation("system")
installation = get_installation(True)
if refresh is True:
searcher = AppstreamSearcher(refresh)
else:
searcher = AppstreamSearcher()
searcher.add_installation(installation)
return searcher
@ -648,7 +664,7 @@ def check_internet():
except requests.ConnectionError:
return False
def repotoggle(repo, toggle=True, user=None):
def repotoggle(repo, toggle=True, system=False):
"""
Enable or disable a Flatpak repository
@ -659,10 +675,10 @@ def repotoggle(repo, toggle=True, user=None):
Returns:
tuple: (success, error_message)
"""
if not user:
if system is False:
installation = get_installation()
else:
installation = get_installation("system")
installation = get_installation(True)
try:
remote = installation.get_remote_by_name(repo)
@ -686,28 +702,28 @@ def repotoggle(repo, toggle=True, user=None):
except GLib.GError as e:
return False, f"Failed to toggle repository: {str(e)}."
def repolist(user=None):
if not user:
def repolist(system=False):
if system is False:
installation = get_installation()
else:
installation = get_installation("system")
installation = get_installation(True)
repos = installation.list_remotes()
return repos
def repodelete(repo, user=None):
if not user:
def repodelete(repo, system=False):
if system is False:
installation = get_installation()
else:
installation = get_installation("system")
installation = get_installation(True)
installation.remove_remote(repo)
def repoadd(repofile, user=None):
def repoadd(repofile, system=False):
"""Add a new repository using a .flatpakrepo file"""
# Get existing repositories
if not user:
if system is False:
installation = get_installation()
else:
installation = get_installation("system")
installation = get_installation(True)
existing_repos = installation.list_remotes()
if not repofile.endswith('.flatpakrepo'):

210
main.py
View file

@ -18,7 +18,9 @@ class MainWindow(Gtk.Window):
self.collection_results = [] # Initialize empty list
self.installed_results = [] # Initialize empty list
self.updates_results = [] # Initialize empty list
self.system_mode = None
self.system_mode = False
self.current_page = None # Track current page
self.current_group = None # Track current group (system/collections/categories)
# Set window size
self.set_default_size(1280, 720)
@ -162,10 +164,7 @@ class MainWindow(Gtk.Window):
def populate_repo_dropdown(self):
# Get list of repositories
if not self.system_mode:
libflatpak_query.repolist()
else:
libflatpak_query.repolist("system")
libflatpak_query.repolist(self.system_mode)
repos = libflatpak_query.repolist()
# Clear existing items
@ -203,15 +202,12 @@ class MainWindow(Gtk.Window):
# Show the dialog
dialog.show_all()
if not self.system_mode:
searcher = libflatpak_query.get_reposearcher()
else:
searcher = libflatpak_query.get_reposearcher("system")
searcher = libflatpak_query.get_reposearcher(self.system_mode)
# Define thread target function
def refresh_target():
try:
category_results, collection_results, installed_results, updates_results = searcher.retrieve_metadata()
category_results, collection_results, installed_results, updates_results = searcher.retrieve_metadata(self.system_mode)
self.category_results = category_results
self.collection_results = collection_results
self.installed_results = installed_results
@ -248,6 +244,65 @@ class MainWindow(Gtk.Window):
if not refresh_thread.is_alive() and dialog.is_active():
dialog.destroy()
def refresh_local(self):
# Create dialog and progress bar
dialog = Gtk.Dialog(
title="Refreshing local data, please wait...",
parent=self,
modal=True,
destroy_with_parent=True
)
dialog.set_size_request(400, 100)
progress_bar = Gtk.ProgressBar()
progress_bar.set_text("Initializing...")
progress_bar.set_show_text(True)
dialog.vbox.pack_start(progress_bar, True, True, 0)
dialog.vbox.set_spacing(12)
# Show the dialog
dialog.show_all()
searcher = libflatpak_query.get_reposearcher(self.system_mode)
# Define thread target function
def refresh_target():
try:
installed_results, updates_results = searcher.refresh_local(self.system_mode)
self.installed_results = installed_results
self.updates_results = updates_results
except Exception as e:
message_type = Gtk.MessageType.ERROR
dialog = Gtk.MessageDialog(
transient_for=None, # Changed from self
modal=True,
destroy_with_parent=True,
message_type=message_type,
buttons=Gtk.ButtonsType.OK,
text=f"Error updating progress: {str(e)}"
)
dialog.run()
dialog.destroy()
# Start the refresh thread
refresh_thread = threading.Thread(target=refresh_target)
refresh_thread.start()
def update_progress():
while refresh_thread.is_alive():
progress_bar.set_text("Refreshing...")
progress = searcher.refresh_progress
progress_bar.set_fraction(progress / 100)
return True
else:
progress_bar.set_fraction(100 / 100)
dialog.destroy()
# Start the progress update timer
GLib.timeout_add_seconds(0.5, update_progress)
dialog.run()
if not refresh_thread.is_alive() and dialog.is_active():
dialog.destroy()
def create_panels(self):
# Check if panels already exist
if hasattr(self, 'left_panel') and self.left_panel.get_parent():
@ -533,10 +588,16 @@ class MainWindow(Gtk.Window):
if widget.get_children()[0].get_label() == display_title:
widget.get_style_context().add_class("dark-category-button-active")
break
self.current_page = category
self.current_group = group
self.update_category_header(category)
self.show_category_apps(category)
def refresh_current_page(self):
"""Refresh the currently displayed page"""
if self.current_page and self.current_group:
self.on_category_clicked(self.current_page, self.current_group)
def update_category_header(self, category):
"""Update the category header text based on the selected category."""
if category in self.category_groups['collections']:
@ -687,10 +748,7 @@ class MainWindow(Gtk.Window):
self.right_container.pack_start(header_bar, False, False, 0)
# Get list of repositories
if not self.system_mode:
repos = libflatpak_query.repolist()
else:
repos = libflatpak_query.repolist("system")
repos = libflatpak_query.repolist(self.system_mode)
# Create a scrolled window for repositories
scrolled_window = Gtk.ScrolledWindow()
@ -897,11 +955,10 @@ class MainWindow(Gtk.Window):
def on_install_clicked(self, button, app):
"""Handle the Install button click with installation options"""
details = app.get_details()
app_id = details['id']
# Create dialog
dialog = Gtk.Dialog(
title=f"Install {details['name']}",
title=f"Install {details['name']}?",
transient_for=self,
modal=True,
destroy_with_parent=True,
@ -919,23 +976,21 @@ class MainWindow(Gtk.Window):
repo_combo = Gtk.ComboBoxText()
repo_combo.set_hexpand(True)
content_area.pack_start(Gtk.Label(label=f"Install: {details['id']}?"), False, False, 0)
# Search for available repositories containing this app
if not self.system_mode:
searcher = libflatpak_query.get_reposearcher()
searcher = libflatpak_query.get_reposearcher(self.system_mode)
if self.system_mode is False:
content_area.pack_start(Gtk.Label(label="Installation Type: User"), False, False, 0)
else:
searcher = libflatpak_query.get_reposearcher("system")
content_area.pack_start(Gtk.Label(label="Installation Type: System"), False, False, 0)
# Populate repository dropdown
available_repos = set()
if not self.system_mode:
repos = libflatpak_query.repolist()
else:
repos = libflatpak_query.repolist("system")
repos = libflatpak_query.repolist(self.system_mode)
for repo in repos:
if not repo.get_disabled():
search_results = searcher.search_flatpak(app_id, repo.get_name())
search_results = searcher.search_flatpak(details['id'], repo.get_name())
if search_results:
available_repos.add(repo)
@ -972,12 +1027,11 @@ class MainWindow(Gtk.Window):
# Perform installation
# Get selected values
if not self.system_mode:
if self.system_mode is False:
print(f"Installing {details['name']} for User from {selected_repo}")
success, message = libflatpak_query.install_flatpak(app.id, selected_repo)
else:
print(f"Installing {details['name']} for System from {selected_repo}")
success, message = libflatpak_query.install_flatpak(app.id, selected_repo, "system")
success, message = libflatpak_query.install_flatpak(app, selected_repo, self.system_mode)
message_type=Gtk.MessageType.INFO
if not success:
message_type=Gtk.MessageType.ERROR
@ -990,17 +1044,65 @@ class MainWindow(Gtk.Window):
buttons=Gtk.ButtonsType.OK,
text=message
)
self.refresh_local()
finished_dialog.run()
finished_dialog.destroy()
self.refresh_current_page()
dialog.destroy()
def on_remove_clicked(self, button, app):
"""Handle the Remove button click"""
"""Handle the Remove button click with removal options"""
details = app.get_details()
print(f"Removing application: {details['name']}")
# Implement removal logic here
# Example:
# Flatpak.uninstall(app_id=details['id'])
# Create dialog
dialog = Gtk.Dialog(
title=f"Remove {details['name']}?",
transient_for=self,
modal=True,
destroy_with_parent=True,
)
# Add buttons using the new method
dialog.add_button("Cancel", Gtk.ResponseType.CANCEL)
dialog.add_button("Remove", Gtk.ResponseType.OK)
# Create content area
content_area = dialog.get_content_area()
content_area.set_spacing(12)
content_area.set_border_width(12)
content_area.pack_start(Gtk.Label(label=f"Remove: {details['id']}?"), False, False, 0)
# Show dialog
dialog.show_all()
# Run dialog
response = dialog.run()
if response == Gtk.ResponseType.OK:
# Perform Removal
# Get selected values
if self.system_mode is False:
print(f"Removing {details['name']} for User.")
else:
print(f"Removing {details['name']} for System.")
success, message = libflatpak_query.remove_flatpak(app, self.system_mode)
message_type=Gtk.MessageType.INFO
if not success:
message_type=Gtk.MessageType.ERROR
if message:
finished_dialog = Gtk.MessageDialog(
transient_for=self,
modal=True,
destroy_with_parent=True,
message_type=message_type,
buttons=Gtk.ButtonsType.OK,
text=message
)
self.refresh_local()
finished_dialog.run()
finished_dialog.destroy()
self.refresh_current_page()
dialog.destroy()
def on_update_clicked(self, button, app):
"""Handle the Update button click"""
@ -1034,13 +1136,10 @@ class MainWindow(Gtk.Window):
checkbox.get_parent().set_sensitive(True)
if checkbox.get_active():
checkbox.get_style_context().remove_class("dim-label")
if not self.system_mode:
success, message = libflatpak_query.repotoggle(repo.get_name(), True)
else:
success, message = libflatpak_query.repotoggle(repo.get_name(), True, "system")
success, message = libflatpak_query.repotoggle(repo.get_name(), True, self.system_mode)
message_type = Gtk.MessageType.INFO
if success:
self.refresh_data()
self.refresh_local()
else:
if message:
message_type = Gtk.MessageType.ERROR
@ -1057,13 +1156,10 @@ class MainWindow(Gtk.Window):
dialog.destroy()
else:
checkbox.get_style_context().add_class("dim-label")
if not self.system_mode:
success, message = libflatpak_query.repotoggle(repo.get_name(), False)
else:
success, message = libflatpak_query.repotoggle(repo.get_name(), False, "system")
success, message = libflatpak_query.repotoggle(repo.get_name(), False, self.system_mode)
message_type = Gtk.MessageType.INFO
if success:
self.refresh_data()
self.refresh_local()
else:
if message:
message_type = Gtk.MessageType.ERROR
@ -1095,11 +1191,8 @@ class MainWindow(Gtk.Window):
if response == Gtk.ResponseType.YES:
try:
if not self.system_mode:
libflatpak_query.repodelete(repo.get_name())
else:
libflatpak_query.repodelete(repo.get_name(), "system")
self.refresh_data()
libflatpak_query.repodelete(repo.get_name(), self.system_mode)
self.refresh_local()
self.show_category_apps('repositories')
except GLib.GError as e:
# Handle polkit authentication failure
@ -1131,10 +1224,7 @@ class MainWindow(Gtk.Window):
def on_add_flathub_repo_button_clicked(self, button):
"""Handle the Add Flathub Repository button click"""
# Add the repository
if not self.system_mode:
success, error_message = libflatpak_query.repoadd("https://dl.flathub.org/repo/flathub.flatpakrepo")
else:
success, error_message = libflatpak_query.repoadd("https://dl.flathub.org/repo/flathub.flatpakrepo", "system")
success, error_message = libflatpak_query.repoadd("https://dl.flathub.org/repo/flathub.flatpakrepo", self.system_mode)
if error_message:
error_dialog = Gtk.MessageDialog(
transient_for=None, # Changed from self
@ -1146,16 +1236,13 @@ class MainWindow(Gtk.Window):
)
error_dialog.run()
error_dialog.destroy()
self.refresh_data()
self.refresh_local()
self.show_category_apps('repositories')
def on_add_flathub_beta_repo_button_clicked(self, button):
"""Handle the Add Flathub Beta Repository button click"""
# Add the repository
if not self.system_mode:
success, error_message = libflatpak_query.repoadd("https://dl.flathub.org/beta-repo/flathub-beta.flatpakrepo")
else:
success, error_message = libflatpak_query.repoadd("https://dl.flathub.org/beta-repo/flathub-beta.flatpakrepo", "system")
success, error_message = libflatpak_query.repoadd("https://dl.flathub.org/beta-repo/flathub-beta.flatpakrepo", self.system_mode)
if error_message:
error_dialog = Gtk.MessageDialog(
transient_for=None, # Changed from self
@ -1167,7 +1254,7 @@ class MainWindow(Gtk.Window):
)
error_dialog.run()
error_dialog.destroy()
self.refresh_data()
self.refresh_local()
self.show_category_apps('repositories')
def on_add_repo_button_clicked(self, button):
@ -1205,10 +1292,7 @@ class MainWindow(Gtk.Window):
if response == Gtk.ResponseType.OK and repo_file_path:
# Add the repository
if not self.system_mode:
success, error_message = libflatpak_query.repoadd(repo_file_path)
else:
success, error_message = libflatpak_query.repoadd(repo_file_path, "system")
success, error_message = libflatpak_query.repoadd(repo_file_path, self.system_mode)
if error_message:
error_dialog = Gtk.MessageDialog(
transient_for=None, # Changed from self
@ -1220,7 +1304,7 @@ class MainWindow(Gtk.Window):
)
error_dialog.run()
error_dialog.destroy()
self.refresh_data()
self.refresh_local()
self.show_category_apps('repositories')
def select_default_category(self):