move collection fetching from app to libflatpak-query

This commit is contained in:
GloriousEggroll 2025-03-24 02:00:59 -06:00
parent 5470b46666
commit 1134aca5e0
3 changed files with 59213 additions and 57227 deletions

File diff suppressed because it is too large Load diff

View file

@ -8,17 +8,18 @@ from pathlib import Path
import logging
from enum import IntEnum
import argparse
import urllib.parse
import requests
from urllib.parse import quote_plus
import tempfile
import os
import sys
import json
import time
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class Match(IntEnum):
NAME = 1
ID = 2
@ -61,7 +62,7 @@ class AppStreamPackage:
return self.component.get_summary()
@property
def version(self) -> str:
def version(self) -> str|None:
releases = self.component.get_releases_plain()
if releases:
release = releases.index_safe(0)
@ -158,6 +159,34 @@ class AppstreamSearcher:
def __init__(self) -> None:
self.remotes: dict[str, list[AppStreamPackage]] = {}
self.installed = []
self.refresh_progress = 0
# Define category groups and their titles
self.category_groups = {
'system': {
'installed': 'Installed',
'updates': 'Updates',
'repositories': 'Repositories'
},
'collections': {
'trending': 'Trending',
'popular': 'Popular',
'recently-added': 'New',
'recently-updated': 'Updated'
},
'categories': {
'office': 'Productivity',
'graphics': 'Graphics & Photography',
'audiovideo': 'Audio & Video',
'education': 'Education',
'network': 'Networking',
'game': 'Games',
'development': 'Developer Tools',
'science': 'Science',
'system': 'System',
'utility': 'Utilities'
}
}
def add_installation(self, inst: Flatpak.Installation):
"""Add enabled flatpak repositories from Flatpak.Installation"""
@ -226,7 +255,7 @@ class AppstreamSearcher:
return search_results
def get_all_apps(self, repo_name: str = None) -> list[AppStreamPackage]:
def get_all_apps(self, repo_name=None) -> list[AppStreamPackage]:
"""Get all available apps from specified or all repositories"""
all_packages = []
if repo_name:
@ -309,11 +338,198 @@ class AppstreamSearcher:
return updates
def reposearcher():
def fetch_flathub_category_apps(self, category):
"""Fetch applications from Flathub API for the specified category."""
try:
# URL encode the category to handle special characters
encoded_category = quote_plus(category)
# Determine the base URL based on category type
if category in self.category_groups['collections']:
url = f"https://flathub.org/api/v2/collection/{encoded_category}"
else:
url = f"https://flathub.org/api/v2/collection/category/{encoded_category}"
response = requests.get(url, timeout=10)
if response.status_code == 200:
data = response.json()
# If this is a collections category, save it to our collections database
if category in self.category_groups['collections']:
if not hasattr(self, 'collections_db'):
self.collections_db = []
self.collections_db.append({
'category': category,
'data': data
})
return data
else:
print(f"Failed to fetch apps: Status code {response.status_code}")
return None
except requests.RequestException as e:
print(f"Error fetching apps: {str(e)}")
return None
def save_collections_data(self, filename='collections_data.json'):
"""Save all collected collections data to a JSON file."""
if not hasattr(self, 'collections_db') or not self.collections_db:
return
try:
with open(filename, 'w', encoding='utf-8') as f:
json.dump(self.collections_db, f, indent=2, ensure_ascii=False)
except IOError as e:
print(f"Error saving collections data: {str(e)}")
def update_collection_results(self, new_collection_results):
"""Update search results by replacing existing items and adding new ones."""
# Create a set of existing app_ids for efficient lookup
existing_app_ids = {app.id for app in self.collection_results}
# Create a list to store the updated results
updated_results = []
# First add all existing results
updated_results.extend(self.collection_results)
# Add new results, replacing any existing ones
for new_result in new_collection_results:
app_id = new_result.id
if app_id in existing_app_ids:
# Replace existing result
for i, existing in enumerate(updated_results):
if existing.id == app_id:
updated_results[i] = new_result
break
else:
# Add new result
updated_results.append(new_result)
self.collection_results = updated_results
def refresh_data(self):
# make sure to reset these to empty before refreshing.
self.category_results = [] # Initialize empty list
self.collection_results = [] # Initialize empty list
self.installed_results = [] # Initialize empty list
self.updates_results = [] # Initialize empty list
total_categories = sum(len(categories) for categories in self.category_groups.values())
current_category = 0
# Search for each app in local repositories
searcher = get_reposearcher()
json_path = "collections_data.json"
search_result = []
for group_name, categories in self.category_groups.items():
# Process categories one at a time to keep GUI responsive
for category, title in categories.items():
if category not in self.category_groups['system']:
# Preload the currently saved collections data first
try:
with open(json_path, 'r', encoding='utf-8') as f:
collections_data = json.load(f)
for collection in collections_data:
if collection['category'] == category:
apps = [app['app_id'] for app in collection['data'].get('hits', [])]
for app_id in apps:
search_result = searcher.search_flatpak(app_id, 'flathub')
self.collection_results.extend(search_result)
except (IOError, json.JSONDecodeError) as e:
print(f"Error loading collections data: {str(e)}")
# Try to get apps from Flathub API if internet is available
if check_internet():
# Get modification time in seconds since epoch
mod_time = os.path.getmtime(json_path)
# Calculate 24 hours in seconds
hours_24 = 24 * 3600
# Check if file is older than 24 hours
if (time.time() - mod_time) > hours_24:
api_data = self.fetch_flathub_category_apps(category)
if api_data:
apps = api_data['hits']
for app in apps:
app_id = app['app_id']
# Search for the app in local repositories
search_result = searcher.search_flatpak(app_id, 'flathub')
self.category_results.extend(search_result)
else:
apps = searcher.get_all_apps('flathub')
for app in apps:
details = app.get_details()
if category in details['categories']:
search_result = searcher.search_flatpak(details['name'], 'flathub')
self.category_results.extend(search_result)
current_category += 1
# Update progress bar
self.refresh_progress = (current_category / total_categories) * 100
else:
if "installed" in category:
installed_apps = searcher.get_installed_apps()
for app_id, repo_name, repo_type in installed_apps:
parts = app_id.split('/')
app_id = parts[parts.index('app') + 1]
if repo_name:
# Extend the existing list instead of creating a new one
search_result = searcher.search_flatpak(app_id, repo_name)
self.installed_results.extend(search_result)
elif "updates" in category:
updates = searcher.check_updates()
for repo_name, app_id, repo_type in updates:
if repo_name:
search_result = searcher.search_flatpak(app_id, repo_name)
self.updates_results.extend(search_result)
self.save_collections_data()
# load collections from json file again
# we do this in one go after all of the data from each category has been saved to the json file.
# this time we update entries that already exist and add new entries that don't exist.
for group_name, categories in self.category_groups.items():
for category, title in categories.items():
if category in self.category_groups['collections']:
try:
with open(json_path, 'r', encoding='utf-8') as f:
collections_data = json.load(f)
for collection in collections_data:
if collection['category'] == category:
apps = [app['app_id'] for app in collection['data'].get('hits', [])]
new_results = []
for app_id in apps:
search_result = searcher.search_flatpak(app_id, 'flathub')
new_results.extend(search_result)
self.update_collection_results(new_results)
except (IOError, json.JSONDecodeError) as e:
print(f"Error loading collections data: {str(e)}")
# make sure to reset these to empty before refreshing.
return self.category_results, self.collection_results, self.installed_results, self.updates_results
def get_refresh_progress():
searcher = AppstreamSearcher()
searcher.add_installation(Flatpak.Installation.new_system())
return searcher
def get_reposearcher():
searcher = AppstreamSearcher()
searcher.add_installation(Flatpak.Installation.new_system())
return searcher
def check_internet():
"""Check if internet connection is available."""
try:
requests.head('https://flathub.org', timeout=3)
return True
except requests.ConnectionError:
return False
def repotoggle(repo, bool=True):
"""
Enable or disable a Flatpak repository
@ -505,7 +721,7 @@ def main():
return
# Create AppstreamSearcher instance
searcher = reposearcher()
searcher = get_reposearcher()
if args.list_installed:
installed_apps = searcher.get_installed_apps()

238
main.py
View file

@ -5,12 +5,10 @@ gi.require_version("Gtk", "3.0")
gi.require_version("GLib", "2.0")
gi.require_version("Flatpak", "1.0")
from gi.repository import Gtk, Gio, Gdk, GLib
import requests
from urllib.parse import quote_plus
import libflatpak_query
import json
import os
import time
from queue import Queue
import threading
class MainWindow(Gtk.Window):
def __init__(self):
@ -183,159 +181,66 @@ class MainWindow(Gtk.Window):
self.selected_repo = dropdown.get_model()[active_index][0]
print(f"Selected repository: {self.selected_repo}")
def update_collection_results(self, new_collection_results):
"""Update search results by replacing existing items and adding new ones."""
# Create a set of existing app_ids for efficient lookup
existing_app_ids = {app.id for app in self.collection_results}
# Create a list to store the updated results
updated_results = []
# First add all existing results
updated_results.extend(self.collection_results)
# Add new results, replacing any existing ones
for new_result in new_collection_results:
app_id = new_result.id
if app_id in existing_app_ids:
# Replace existing result
for i, existing in enumerate(updated_results):
if existing.id == app_id:
updated_results[i] = new_result
break
else:
# Add new result
updated_results.append(new_result)
self.collection_results = updated_results
def refresh_data(self):
# make sure to reset these to empty before refreshing.
self.category_results = [] # Initialize empty list
self.collection_results = [] # Initialize empty list
self.installed_results = [] # Initialize empty list
self.updates_results = [] # Initialize empty list
total_categories = sum(len(categories) for categories in self.category_groups.values())
current_category = 0
msg = "Fetching metadata, please wait..."
# Create dialog and progress bar
dialog = Gtk.Dialog(
title=msg,
title="Fetching metadata, please wait...",
parent=self,
modal=True,
destroy_with_parent=True
)
# Set dialog size
dialog.set_size_request(400, 100)
# Create progress bar
progress_bar = Gtk.ProgressBar()
progress_bar.set_text(msg)
# Add progress bar to dialog
progress_bar.set_text("Initializing...")
progress_bar.set_show_text(True)
dialog.vbox.pack_start(progress_bar, True, True, 0)
dialog.vbox.set_spacing(12)
# Show the dialog and all its children
# Show the dialog
dialog.show_all()
# Search for each app in local repositories
searcher = libflatpak_query.reposearcher()
searcher = libflatpak_query.get_reposearcher()
json_path = "collections_data.json"
search_result = []
for group_name, categories in self.category_groups.items():
# Process categories one at a time to keep GUI responsive
for category, title in categories.items():
if category not in self.category_groups['system']:
# Preload the currently saved collections data first
try:
with open(json_path, 'r', encoding='utf-8') as f:
collections_data = json.load(f)
for collection in collections_data:
if collection['category'] == category:
apps = [app['app_id'] for app in collection['data'].get('hits', [])]
for app_id in apps:
search_result = searcher.search_flatpak(app_id, 'flathub')
self.collection_results.extend(search_result)
except (IOError, json.JSONDecodeError) as e:
print(f"Error loading collections data: {str(e)}")
# Define thread target function
def refresh_target():
try:
category_results, collection_results, installed_results, updates_results = searcher.refresh_data()
self.category_results = category_results
self.collection_results = collection_results
self.installed_results = installed_results
self.updates_results = updates_results
except Exception as e:
message_type = Gtk.MessageType.ERROR
dialog = Gtk.MessageDialog(
transient_for=None, # Changed from self
modal=True,
destroy_with_parent=True,
message_type=message_type,
buttons=Gtk.ButtonsType.OK,
text=f"Error updating progress: {str(e)}"
)
dialog.run()
dialog.destroy()
# Try to get apps from Flathub API if internet is available
if self.check_internet():
# Get modification time in seconds since epoch
mod_time = os.path.getmtime(json_path)
# Calculate 24 hours in seconds
hours_24 = 24 * 3600
# Check if file is older than 24 hours
if (time.time() - mod_time) > hours_24:
api_data = self.fetch_flathub_category_apps(category)
if api_data:
apps = api_data['hits']
# Start the refresh thread
refresh_thread = threading.Thread(target=refresh_target)
refresh_thread.start()
def update_progress():
while refresh_thread.is_alive():
progress_bar.set_text("Fetching...")
progress = searcher.refresh_progress
progress_bar.set_fraction(progress / 100)
return True
else:
progress_bar.set_fraction(100 / 100)
dialog.destroy()
for app in apps:
app_id = app['app_id']
# Search for the app in local repositories
search_result = searcher.search_flatpak(app_id, 'flathub')
self.category_results.extend(search_result)
else:
apps = searcher.get_all_apps('flathub')
for app in apps:
details = app.get_details()
if category in details['categories']:
search_result = searcher.search_flatpak(details['name'], 'flathub')
self.category_results.extend(search_result)
current_category += 1
# Update progress bar
progress = (current_category / total_categories) * 100
progress_bar.set_fraction(progress / 100)
# Force GTK to process events
while Gtk.events_pending():
Gtk.main_iteration_do(False)
else:
if "installed" in category:
installed_apps = searcher.get_installed_apps()
for app_id, repo_name, repo_type in installed_apps:
parts = app_id.split('/')
app_id = parts[parts.index('app') + 1]
if repo_name:
# Extend the existing list instead of creating a new one
search_result = searcher.search_flatpak(app_id, repo_name)
self.installed_results.extend(search_result)
elif "updates" in category:
updates = searcher.check_updates()
for repo_name, app_id, repo_type in updates:
if repo_name:
search_result = searcher.search_flatpak(app_id, repo_name)
self.updates_results.extend(search_result)
self.save_collections_data()
# load collections from json file again
# we do this in one go after all of the data from each category has been saved to the json file.
# this time we update entries that already exist and add new entries that don't exist.
for group_name, categories in self.category_groups.items():
for category, title in categories.items():
if category in self.category_groups['collections']:
try:
with open(json_path, 'r', encoding='utf-8') as f:
collections_data = json.load(f)
for collection in collections_data:
if collection['category'] == category:
apps = [app['app_id'] for app in collection['data'].get('hits', [])]
new_results = []
for app_id in apps:
search_result = searcher.search_flatpak(app_id, 'flathub')
new_results.extend(search_result)
self.update_collection_results(new_results)
except (IOError, json.JSONDecodeError) as e:
print(f"Error loading collections data: {str(e)}")
dialog.destroy()
# Start the progress update timer
GLib.timeout_add_seconds(0.5, update_progress)
dialog.run()
if not refresh_thread.is_alive() and dialog.is_active():
dialog.destroy()
def create_panels(self):
# Check if panels already exist
@ -665,59 +570,6 @@ class MainWindow(Gtk.Window):
return self.right_panel
def check_internet(self):
"""Check if internet connection is available."""
try:
requests.head('https://flathub.org', timeout=3)
return True
except requests.ConnectionError:
return False
def fetch_flathub_category_apps(self, category):
"""Fetch applications from Flathub API for the specified category."""
try:
# URL encode the category to handle special characters
encoded_category = quote_plus(category)
# Determine the base URL based on category type
if category in self.category_groups['collections']:
url = f"https://flathub.org/api/v2/collection/{encoded_category}"
else:
url = f"https://flathub.org/api/v2/collection/category/{encoded_category}"
response = requests.get(url, timeout=10)
if response.status_code == 200:
data = response.json()
# If this is a collections category, save it to our collections database
if category in self.category_groups['collections']:
if not hasattr(self, 'collections_db'):
self.collections_db = []
self.collections_db.append({
'category': category,
'data': data
})
return data
else:
print(f"Failed to fetch apps: Status code {response.status_code}")
return None
except requests.RequestException as e:
print(f"Error fetching apps: {str(e)}")
return None
def save_collections_data(self, filename='collections_data.json'):
"""Save all collected collections data to a JSON file."""
if not hasattr(self, 'collections_db') or not self.collections_db:
return
try:
with open(filename, 'w', encoding='utf-8') as f:
json.dump(self.collections_db, f, indent=2, ensure_ascii=False)
except IOError as e:
print(f"Error saving collections data: {str(e)}")
# Create and connect buttons
def create_button(self, callback, app, label=None, condition=None):
"""Create a button with optional visibility condition"""