add detection of installed or needing-updates flatpaks

This commit is contained in:
GloriousEggroll 2025-03-23 01:25:27 -06:00
parent c6d9939f10
commit 7a72f1a4af
2 changed files with 290 additions and 72 deletions

View file

@ -7,9 +7,7 @@ from gi.repository import Flatpak, GLib, Gio, AppStream
from pathlib import Path
import logging
from enum import IntEnum
from pathlib import Path
import argparse
import sys
# Set up logging
logging.basicConfig(level=logging.INFO)
@ -85,9 +83,6 @@ class AppStreamPackage:
def _get_icon_cache_path(self, size: str) -> str:
# Remove the file:// prefix
icon_filename = self._get_icon_filename()
# Appstream icon cache path for the flatpak repo queried
icon_cache_path = Path(self.remote.get_appstream_dir().get_path() + "/icons/flatpak/" + size + "/")
return str(icon_cache_path)
@ -198,10 +193,14 @@ class AppstreamSearcher:
search_results = []
packages = self.remotes[repo_name]
for package in packages:
found = package.search(keyword)
if found != Match.NONE:
logger.debug(f" found : {package} match: {found}")
package.match = found
# Try matching exact ID first
if keyword is package.id:
search_results.append(package)
# Try matching case insensitive ID next
elif keyword.lower() is package.id.lower():
search_results.append(package)
# General keyword search
elif keyword.lower() in str(package).lower():
search_results.append(package)
return search_results
@ -209,16 +208,19 @@ class AppstreamSearcher:
def search_flatpak(self, keyword: str, repo_name=None) -> list[AppStreamPackage]:
"""Search packages matching a keyword"""
search_results = []
keyword = keyword.lower()
if repo_name:
search_results = self.search_flatpak_repo(keyword, repo_name)
else:
for remote_name in self.remotes.keys():
results = self.search_flatpak_repo(keyword, remote_name)
for result in results:
search_results.append(result)
keyword = keyword
if repo_name and repo_name in self.remotes.keys():
results = self.search_flatpak_repo(keyword, repo_name)
for result in results:
search_results.append(result)
return search_results
for remote_name in self.remotes.keys():
results = self.search_flatpak_repo(keyword, remote_name)
for result in results:
search_results.append(result)
return search_results
def get_all_apps(self, repo_name: str = None) -> list[AppStreamPackage]:
"""Get all available apps from specified or all repositories"""
all_packages = []
@ -243,6 +245,65 @@ class AppstreamSearcher:
return categories
def get_installed_apps(self) -> list[tuple[str, str, str]]:
"""Get a list of all installed Flatpak applications with their repository source"""
installed_refs = []
# Get both system-wide and user installations
system_inst = Flatpak.Installation.new_system(None)
user_inst = Flatpak.Installation.new_user(None)
def process_installed_refs(inst: Flatpak.Installation, repo_type: str):
for ref in inst.list_installed_refs_by_kind(Flatpak.RefKind.APP):
app_id = ref.format_ref()
# Get remote name from the installation
remote_name = ref.get_origin()
# Handle cases where remote might be None
if not remote_name:
remote_name = repo_type.capitalize()
installed_refs.append((app_id, remote_name, repo_type))
# Process both system-wide and user installations
process_installed_refs(system_inst, "system")
process_installed_refs(user_inst, "user")
# Remove duplicates while maintaining order
seen = set()
unique_installed = [(ref, repo, repo_type) for ref, repo, repo_type in installed_refs
if not (ref in seen or seen.add(ref))]
return unique_installed
def check_updates(self) -> list[tuple[str, str, str]]:
"""Check for available updates for installed Flatpak applications"""
updates = []
# Get both system-wide and user installations
system_inst = Flatpak.Installation.new_system(None)
user_inst = Flatpak.Installation.new_user(None)
def check_updates_for_install(inst: Flatpak.Installation, repo_type: str):
for ref in inst.list_installed_refs_for_update(None):
app_id = ref.get_name()
# Get remote name from the installation
remote_name = ref.get_origin()
# Handle cases where remote might be None
if not remote_name:
remote_name = repo_type.capitalize()
updates.append((remote_name, app_id, repo_type))
# Check system-wide installation
check_updates_for_install(system_inst, "system")
# Check user installation
check_updates_for_install(user_inst, "user")
return updates
def main():
"""Main function demonstrating Flatpak information retrieval"""
@ -251,6 +312,10 @@ def main():
parser.add_argument('--repo', help='Filter results to specific repository')
parser.add_argument('--list-all', action='store_true', help='List all available apps')
parser.add_argument('--categories', action='store_true', help='Show apps grouped by category')
parser.add_argument('--list-installed', action='store_true',
help='List all installed Flatpak applications')
parser.add_argument('--check-updates', action='store_true',
help='Check for available updates')
args = parser.parse_args()
app_id = args.id
@ -265,6 +330,22 @@ def main():
installation = Flatpak.Installation.new_system(None)
searcher.add_installation(installation)
if args.list_installed:
installed_apps = searcher.get_installed_apps()
print(f"\nInstalled Flatpak Applications ({len(installed_apps)}):")
for app_id, repo_name, repo_type in installed_apps:
parts = app_id.split('/')
app_id = parts[parts.index('app') + 1]
print(f"{app_id} (Repository: {repo_name}, Installation: {repo_type})")
return
if args.check_updates:
updates = searcher.check_updates()
print(f"\nAvailable Updates ({len(updates)}):")
for repo_name, app_id, repo_type in updates:
print(f"{app_id} (Repository: {repo_name}, Installation: {repo_type})")
return
if list_all:
apps = searcher.get_all_apps(repo_filter)
for app in apps:

247
main.py
View file

@ -11,6 +11,8 @@ from urllib.parse import quote_plus
import libflatpak_query
from libflatpak_query import AppstreamSearcher, Flatpak
import json
import os
import time
class MainWindow(Gtk.Window):
def __init__(self):
@ -19,12 +21,19 @@ class MainWindow(Gtk.Window):
# Store search results as an instance variable
self.category_results = [] # Initialize empty list
self.collection_results = [] # Initialize empty list
self.installed_results = [] # Initialize empty list
self.updates_results = [] # Initialize empty list
# Set window size
self.set_default_size(1280, 720)
# Define category groups and their titles
self.category_groups = {
'system': {
'installed': 'Installed',
'updates': 'Updates',
'repositories': 'Repositories'
},
'collections': {
'trending': 'Trending',
'popular': 'Popular',
@ -100,6 +109,27 @@ class MainWindow(Gtk.Window):
# Select Trending by default
self.select_default_category()
def populate_repo_dropdown(self):
# Get list of repositories
installation = Flatpak.Installation.new_system()
repos = installation.list_remotes()
# Clear existing items
self.repo_dropdown.remove_all()
# Add repository names
for repo in repos:
self.repo_dropdown.append_text(repo.get_remote_name())
# Connect selection changed signal
self.repo_dropdown.connect("changed", self.on_repo_selected)
def on_repo_selected(self, dropdown):
active_index = dropdown.get_active()
if active_index != -1:
self.selected_repo = dropdown.get_model()[active_index][0]
print(f"Selected repository: {self.selected_repo}")
def update_collection_results(self, new_collection_results):
"""Update search results by replacing existing items and adding new ones."""
# Create a set of existing app_ids for efficient lookup
@ -154,54 +184,77 @@ class MainWindow(Gtk.Window):
# Search for each app in local repositories
searcher = AppstreamSearcher()
searcher.add_installation(Flatpak.Installation.new_user())
searcher.add_installation(Flatpak.Installation.new_system())
json_path = "collections_data.json"
search_result = []
for group_name, categories in self.category_groups.items():
# Process categories one at a time to keep GUI responsive
for category, title in categories.items():
if category not in self.category_groups['system']:
# Preload the currently saved collections data first
try:
with open(json_path, 'r', encoding='utf-8') as f:
collections_data = json.load(f)
for collection in collections_data:
if collection['category'] == category:
apps = [app['app_id'] for app in collection['data'].get('hits', [])]
for app_id in apps:
search_result = searcher.search_flatpak(app_id, 'flathub')
self.collection_results.extend(search_result)
except (IOError, json.JSONDecodeError) as e:
print(f"Error loading collections data: {str(e)}")
# Preload the currently saved collections data first
try:
with open("collections_data.json", 'r', encoding='utf-8') as f:
collections_data = json.load(f)
for collection in collections_data:
if collection['category'] == category:
apps = [app['app_id'] for app in collection['data'].get('hits', [])]
for app_id in apps:
# Try to get apps from Flathub API if internet is available
if self.check_internet():
# Get modification time in seconds since epoch
mod_time = os.path.getmtime(json_path)
# Calculate 24 hours in seconds
hours_24 = 24 * 3600
# Check if file is older than 24 hours
if (time.time() - mod_time) > hours_24:
api_data = self.fetch_flathub_category_apps(category)
if api_data:
apps = api_data['hits']
for app in apps:
app_id = app['app_id']
# Search for the app in local repositories
search_result = searcher.search_flatpak(app_id, 'flathub')
self.collection_results.extend(search_result)
except (IOError, json.JSONDecodeError) as e:
print(f"Error loading collections data: {str(e)}")
# Try to get apps from Flathub API if internet is available
if self.check_internet():
api_data = self.fetch_flathub_category_apps(category)
if api_data:
apps = api_data['hits']
self.category_results.extend(search_result)
else:
apps = searcher.get_all_apps('flathub')
for app in apps:
app_id = app['app_id']
# Search for the app in local repositories
search_result = searcher.search_flatpak(app_id, 'flathub')
self.category_results.extend(search_result)
details = app.get_details()
if category in details['categories']:
search_result = searcher.search_flatpak(details['name'], 'flathub')
self.category_results.extend(search_result)
current_category += 1
# Update progress bar
progress = (current_category / total_categories) * 100
progress_bar.set_fraction(progress / 100)
# Force GTK to process events
while Gtk.events_pending():
Gtk.main_iteration_do(False)
else:
apps = searcher.get_all_apps('flathub')
for app in apps:
details = app.get_details()
if category in details['categories']:
search_result = searcher.search_flatpak(details['name'], 'flathub')
self.category_results.extend(search_result)
current_category += 1
# Update progress bar
progress = (current_category / total_categories) * 100
progress_bar.set_fraction(progress / 100)
# Force GTK to process events
while Gtk.events_pending():
Gtk.main_iteration_do(False)
if "installed" in category:
installed_apps = searcher.get_installed_apps()
for app_id, repo_name, repo_type in installed_apps:
parts = app_id.split('/')
app_id = parts[parts.index('app') + 1]
if repo_name:
# Extend the existing list instead of creating a new one
search_result = searcher.search_flatpak(app_id, repo_name)
self.installed_results.extend(search_result)
elif "updates" in category:
updates = searcher.check_updates()
for repo_name, app_id, repo_type in updates:
if repo_name:
search_result = searcher.search_flatpak(app_id, repo_name)
self.updates_results.extend(search_result)
self.save_collections_data()
# load collections from json file again
@ -211,7 +264,7 @@ class MainWindow(Gtk.Window):
for category, title in categories.items():
if category in self.category_groups['collections']:
try:
with open("collections_data.json", 'r', encoding='utf-8') as f:
with open(json_path, 'r', encoding='utf-8') as f:
collections_data = json.load(f)
for collection in collections_data:
if collection['category'] == category:
@ -342,7 +395,7 @@ class MainWindow(Gtk.Window):
self.category_header.get_style_context().add_class("panel-header")
self.category_header.set_hexpand(True)
self.category_header.set_halign(Gtk.Align.START)
self.right_panel.pack_start(self.category_header, False, False, 0) # Pack header first
self.right_panel.pack_start(self.category_header, False, False, 0)
# Create scrollable area
scrolled_window = Gtk.ScrolledWindow()
@ -355,8 +408,7 @@ class MainWindow(Gtk.Window):
self.right_container.set_border_width(6)
scrolled_window.add(self.right_container)
self.right_panel.pack_start(scrolled_window, True, True, 0) # Pack scrolled window second
self.right_panel.pack_start(scrolled_window, True, True, 0)
self.main_box.pack_end(self.right_panel, True, True, 0)
return self.right_container
@ -414,12 +466,36 @@ class MainWindow(Gtk.Window):
except IOError as e:
print(f"Error saving collections data: {str(e)}")
# Create and connect buttons
def create_button(self, label, callback, app, condition=None):
"""Create a button with optional visibility condition"""
button = Gtk.Button(label=label)
button.get_style_context().add_class("app-button")
if condition is not None:
if not condition(app):
return None
button.connect("clicked", callback, app)
return button
def clear_container(self, container):
"""Clear all widgets from a container"""
for child in container.get_children():
child.destroy()
def show_category_apps(self, category):
# Clear existing content
# Clear existing content properly
for child in self.right_container.get_children():
child.destroy()
# Initialize apps list
apps = []
# Load system data
if 'installed' in category:
apps.extend([app for app in self.installed_results])
if 'updates' in category:
apps.extend([app for app in self.updates_results])
# Load collections data
try:
with open("collections_data.json", 'r', encoding='utf-8') as f:
@ -438,23 +514,24 @@ class MainWindow(Gtk.Window):
]
# Filter apps based on presence in category
apps = [
apps.extend([
app for app in self.collection_results
if app.get_details()['id'] in app_ids_in_category
]
])
else:
# Fallback to previous behavior if category isn't in collections
apps = [
apps.extend([
app for app in self.collection_results
if category in app.get_details()['categories']
]
])
except (IOError, json.JSONDecodeError) as e:
print(f"Error reading collections data: {str(e)}")
apps = [
apps.extend([
app for app in self.collection_results
if category in app.get_details()['categories']
]
])
# Display each application
for app in apps:
details = app.get_details()
@ -471,8 +548,8 @@ class MainWindow(Gtk.Window):
# Create and add the icon
icon = Gtk.Image.new_from_file(f"{details['icon_path_64']}/{details['icon_filename']}")
icon.set_size_request(48, 48) # Set a reasonable size for the icon
icon_box.pack_start(icon, True, True, 0) # Add icon to the box
icon.set_size_request(48, 48)
icon_box.pack_start(icon, True, True, 0)
# Create right side layout for text
right_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL)
@ -493,12 +570,47 @@ class MainWindow(Gtk.Window):
desc_label.set_line_wrap_mode(Gtk.WrapMode.WORD)
desc_label.get_style_context().add_class("dim-label")
# Create buttons box
buttons_box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL)
buttons_box.set_spacing(6)
buttons_box.set_margin_top(4)
buttons_box.set_halign(Gtk.Align.END)
# Install button
install_btn = self.create_button(
"Install",
self.on_install_clicked,
app
)
buttons_box.pack_end(install_btn, False, False, 0)
# Details button
details_btn = self.create_button(
"Details",
self.on_details_clicked,
app
)
buttons_box.pack_end(details_btn, False, False, 0)
# Donate button with condition
donate_btn = self.create_button(
"Donate",
self.on_donate_clicked,
app,
lambda x: x.get_details().get('urls', {}).get('donation', '')
)
if donate_btn:
buttons_box.pack_end(donate_btn, False, False, 0)
# Add widgets to right box
right_box.pack_start(title_label, False, False, 0)
right_box.pack_start(desc_label, False, False, 0)
right_box.pack_start(buttons_box, False, True, 0)
# Add separator
separator = Gtk.Separator(orientation=Gtk.Orientation.HORIZONTAL)
# Add to container
right_box.pack_start(title_label, False, False, 0)
right_box.pack_start(desc_label, False, False, 0)
app_container.pack_start(icon_box, False, False, 0)
app_container.pack_start(right_box, True, True, 0)
self.right_container.pack_start(app_container, False, False, 0)
@ -506,6 +618,31 @@ class MainWindow(Gtk.Window):
self.right_container.show_all() # Show all widgets after adding them
def on_install_clicked(self, button, app):
"""Handle the Install button click"""
details = app.get_details()
print(f"Installing application: {details['name']}")
# Implement installation logic here
# Example:
# Flatpak.install(app_id=details['id'])
def on_details_clicked(self, button, app):
"""Handle the Details button click"""
details = app.get_details()
print(f"Showing details for: {details['name']}")
# Implement details view here
# Could open a new window with extended information
def on_donate_clicked(self, button, app):
"""Handle the Donate button click"""
details = app.get_details()
donation_url = details.get('urls', {}).get('donation', '')
if donation_url:
try:
Gio.AppInfo.launch_default_for_uri(donation_url, None)
except Exception as e:
print(f"Error opening donation URL: {str(e)}")
def select_default_category(self):
# Select Trending by default
if 'collections' in self.category_widgets and self.category_widgets['collections']: