initial commit

This commit is contained in:
GloriousEggroll 2025-03-22 10:28:50 -06:00
parent 70987e0e9c
commit f072be3748
4 changed files with 798 additions and 0 deletions

3
README Normal file
View file

@ -0,0 +1,3 @@
This is very much currently WIP. Right now it only fetches data from appstream.
sqlite database is temporary. no need to store things when they are already stored in appstream to begin with

BIN
flatshop_db Normal file

Binary file not shown.

287
libflatpak_query.py Executable file
View file

@ -0,0 +1,287 @@
#!/usr/bin/env python3
import gi
gi.require_version("AppStream", "1.0")
gi.require_version("Flatpak", "1.0")
from gi.repository import Flatpak, GLib, Gio, AppStream
from pathlib import Path
import logging
from enum import IntEnum
from pathlib import Path
import argparse
import sys
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class Match(IntEnum):
NAME = 1
ID = 2
SUMMARY = 3
NONE = 4
class AppStreamPackage:
def __init__(self, comp: AppStream.Component, remote: Flatpak.Remote) -> None:
self.component: AppStream.Component = comp
self.remote: Flatpak.Remote = remote
self.repo_name: str = remote.get_name()
bundle: AppStream.Bundle = comp.get_bundle(AppStream.BundleKind.FLATPAK)
self.flatpak_bundle: str = bundle.get_id()
self.match = Match.NONE
# Get icon and description
self.icon_url = self._get_icon_url()
self.icon_path_128 = self._get_icon_cache_path("128x128")
self.icon_path_64 = self._get_icon_cache_path("64x64")
self.icon_filename = self._get_icon_filename()
self.description = self.component.get_description()
# Get URLs from the component
self.urls = self._get_urls()
self.developer = self.component.get_developer().get_name()
self.categories = self._get_categories()
@property
def id(self) -> str:
return self.component.get_id()
@property
def name(self) -> str:
return self.component.get_name()
@property
def summary(self) -> str:
return self.component.get_summary()
@property
def version(self) -> str:
releases = self.component.get_releases_plain()
if releases:
release = releases.index_safe(0)
if release:
version = release.get_version()
return version
return None
def _get_icon_url(self) -> str:
"""Get the remote icon URL from the component"""
icons = self.component.get_icons()
# Find the first REMOTE icon
remote_icon = next((icon for icon in icons if icon.get_kind() == AppStream.IconKind.REMOTE), None)
return remote_icon.get_url() if remote_icon else ""
def _get_icon_filename(self) -> str:
"""Get the cached icon filename from the component"""
icons = self.component.get_icons()
# Find the first CACHED icon
cached_icon = next((icon for icon in icons if icon.get_kind() == AppStream.IconKind.CACHED), None)
return cached_icon.get_filename() if cached_icon else ""
def _get_icon_cache_path(self, size: str) -> str:
# Remove the file:// prefix
icon_filename = self._get_icon_filename()
# Appstream icon cache path for the flatpak repo queried
icon_cache_path = Path(self.remote.get_appstream_dir().get_path() + "/icons/flatpak/" + size + "/")
return str(icon_cache_path)
def _get_urls(self) -> dict:
"""Get URLs from the component"""
urls = {
'donation': self._get_url('donation'),
'homepage': self._get_url('homepage'),
'bugtracker': self._get_url('bugtracker')
}
return urls
def _get_url(self, url_kind: str) -> str:
"""Helper method to get a specific URL type"""
# Convert string to AppStream.UrlKind enum
url_kind_enum = getattr(AppStream.UrlKind, url_kind.upper())
url = self.component.get_url(url_kind_enum)
if url:
return url
return ""
def _get_categories(self) -> list:
categories_fetch = self.component.get_categories()
categories = []
for category in categories_fetch:
categories.append(category.lower())
return categories
def search(self, keyword: str) -> Match:
"""Search for keyword in package details"""
if keyword in self.name.lower():
return Match.NAME
elif keyword in self.id.lower():
return Match.ID
elif keyword in self.summary.lower():
return Match.SUMMARY
else:
return Match.NONE
def __str__(self) -> str:
return f"{self.name} - {self.summary} ({self.flatpak_bundle})"
def get_details(self) -> dict:
"""Get all package details including icon and description"""
return {
"name": self.name,
"id": self.id,
"summary": self.summary,
"description": self.description,
"version": self.version,
"icon_url": self.icon_url,
"icon_path_128": self.icon_path_128,
"icon_path_64": self.icon_path_64,
"icon_filename": self.icon_filename,
"urls": self.urls,
"developer": self.developer,
#"architectures": self.architectures,
"categories": self.categories,
"bundle_id": self.flatpak_bundle,
"match_type": self.match.name,
"repo": self.repo_name
}
class AppstreamSearcher:
"""Flatpak AppStream Package seacher"""
def __init__(self) -> None:
self.remotes: dict[str, list[AppStreamPackage]] = {}
self.installed = []
def add_installation(self, inst: Flatpak.Installation):
"""Add enabled flatpak repositories from Flatpak.Installation"""
remotes = inst.list_remotes()
for remote in remotes:
if not remote.get_disabled():
self.add_remote(remote, inst)
def add_remote(self, remote: Flatpak.Remote, inst: Flatpak.Installation):
"""Add packages for a given Flatpak.Remote"""
remote_name = remote.get_name()
self.installed.extend([ref.format_ref() for ref in inst.list_installed_refs_by_kind(Flatpak.RefKind.APP)])
if remote_name not in self.remotes:
self.remotes[remote_name] = self._load_appstream_metadata(remote)
def _load_appstream_metadata(self, remote: Flatpak.Remote) -> list[AppStreamPackage]:
"""load AppStrean metadata and create AppStreamPackage objects"""
packages = []
metadata = AppStream.Metadata.new()
metadata.set_format_style(AppStream.FormatStyle.CATALOG)
appstream_file = Path(remote.get_appstream_dir().get_path() + "/appstream.xml.gz")
if appstream_file.exists():
metadata.parse_file(Gio.File.new_for_path(appstream_file.as_posix()), AppStream.FormatKind.XML)
components: AppStream.ComponentBox = metadata.get_components()
i = 0
for i in range(components.get_size()):
component = components.index_safe(i)
if component.get_kind() == AppStream.ComponentKind.DESKTOP_APP:
bundle = component.get_bundle(AppStream.BundleKind.FLATPAK).get_id()
if bundle not in self.installed:
packages.append(AppStreamPackage(component, remote))
return packages
else:
logger.debug(f"AppStream file not found: {appstream_file}")
return []
def search_flatpak_repo(self, keyword: str, repo_name: str) -> list[AppStreamPackage]:
search_results = []
packages = self.remotes[repo_name]
for package in packages:
found = package.search(keyword)
if found != Match.NONE:
logger.debug(f" found : {package} match: {found}")
package.match = found
search_results.append(package)
return search_results
def search_flatpak(self, keyword: str, repo_name=None) -> list[AppStreamPackage]:
"""Search packages matching a keyword"""
search_results = []
keyword = keyword.lower()
if repo_name:
search_results = self.search_flatpak_repo(keyword, repo_name)
else:
for remote_name in self.remotes.keys():
results = self.search_flatpak_repo(keyword, remote_name)
for result in results:
search_results.append(result)
return search_results
def main():
"""Main function demonstrating Flatpak information retrieval"""
parser = argparse.ArgumentParser(description='Search Flatpak packages')
parser.add_argument('--id', help='Application ID to search for')
parser.add_argument('--repo', help='Filter results to specific repository')
args = parser.parse_args()
app_id = args.id
repo_filter = args.repo
if not app_id:
print("Usage: python flatpak_info.py --<option> <value>")
print("options: --id --repo")
print("example (app search single repo): --id net.lutris.Lutris --repo flathub")
print("example (app search all repos): --id net.lutris.Lutris")
return
# Create AppstreamSearcher instance
searcher = AppstreamSearcher()
# Add installations
installation = Flatpak.Installation.new_system(None)
searcher.add_installation(installation)
if app_id == "" or len(app_id) < 3:
self._clear()
return
logger.debug(f"(flatpak_search) key: {app_id}")
# Now you can call search method on the searcher instance
if repo_filter:
search_results = searcher.search_flatpak(app_id, repo_filter)
else:
search_results = searcher.search_flatpak(app_id)
if search_results:
for package in search_results:
details = package.get_details()
print(f"Name: {details['name']}")
print(f"ID: {details['id']}")
print(f"Summary: {details['summary']}")
print(f"Description: {details['description']}")
print(f"Version: {details['version']}")
print(f"Icon URL: {details['icon_url']}")
print(f"Icon PATH 128x128: {details['icon_path_128']}")
print(f"Icon PATH 64x64: {details['icon_path_64']}")
print(f"Icon FILE: {details['icon_filename']}")
print(f"Developer: {details['developer']}")
print(f"Categories: {details['categories']}")
urls = details['urls']
print(f"Donation URL: {urls['donation']}")
print(f"Homepage URL: {urls['homepage']}")
print(f"Bug Tracker URL: {urls['bugtracker']}")
print(f"Bundle ID: {details['bundle_id']}")
print(f"Match Type: {details['match_type']}")
print(f"Repo: {details['repo']}")
print("-" * 50)
return
if __name__ == "__main__":
main()

508
main.py Executable file
View file

@ -0,0 +1,508 @@
#!/usr/bin/python3
import gi
gi.require_version("Gtk", "3.0")
gi.require_version("GLib", "2.0")
gi.require_version("Flatpak", "1.0")
from gi.repository import Gtk, Gio, Gdk
import sqlite3
import requests
from urllib.parse import quote_plus
import libflatpak_query
from libflatpak_query import AppstreamSearcher, Flatpak
import json
class MainWindow(Gtk.Window):
def __init__(self):
super().__init__()
# Set window size
self.set_default_size(1280, 720)
# Define category groups and their titles
self.category_groups = {
'collections': {
'trending': 'Trending',
'popular': 'Popular',
'recently-added': 'New',
'recently-updated': 'Updated'
},
'categories': {
'office': 'Productivity',
'graphics': 'Graphics & Photography',
'audiovideo': 'Audio & Video',
'education': 'Education',
'network': 'Networking',
'game': 'Games',
'development': 'Developer Tools',
'science': 'Science',
'system': 'System',
'utility': 'Utilities'
}
}
# Define subcategories for Games
self.subcategories = {
'Emulator': 'Emulators',
'Launcher': 'Game Launchers',
'Tool': 'Game Tools'
}
# Add CSS provider for custom styling
css_provider = Gtk.CssProvider()
css_provider.load_from_data("""
.dark-header {
background-color: #333333;
padding: 6px;
margin: 0;
}
.dark-category-button {
border: 0px;
padding: 6px;
margin: 0;
}
.dark-category-button,
.dark-category-button:hover,
.dark-category-button:focus,
.dark-category-button:active {
background: none;
border: none;
padding: 0;
outline: none;
box-shadow: none;
transition: none;
-webkit-appearance: none;
}
""")
# Add CSS provider to the default screen
Gtk.StyleContext.add_provider_for_screen(
Gdk.Screen.get_default(),
css_provider,
600
)
# Create main layout
self.main_box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL)
self.add(self.main_box)
# Create panels
self.create_panels()
self.refresh_database()
# Select Trending by default
self.select_default_category()
def refresh_database(self):
# Try to get apps from Flathub API if internet is available
if self.check_internet():
total_categories = sum(len(categories) for categories in self.category_groups.values())
current_category = 0
msg = "Updating metadata, please wait..."
dialog = Gtk.Dialog(
title=msg,
parent=self,
modal=True,
destroy_with_parent=True
)
# Set dialog size
dialog.set_size_request(400, 100)
# Create progress bar
progress_bar = Gtk.ProgressBar()
progress_bar.set_text(msg)
# Add progress bar to dialog
dialog.vbox.pack_start(progress_bar, True, True, 0)
dialog.vbox.set_spacing(12)
# Show the dialog and all its children
dialog.show_all()
for group_name, categories in self.category_groups.items():
# Process categories one at a time to keep GUI responsive
for category, title in categories.items():
api_data = self.fetch_flathub_category_apps(category)
if api_data:
apps = api_data['hits']
# Create database if it doesn't exist
db_path = 'flatshop_db'
create_repo_table(db_path, 'flathub')
# Search for each app in local repositories
searcher = AppstreamSearcher()
searcher.add_installation(Flatpak.Installation.new_user())
for app in apps:
app_id = app['app_id']
# Search for the app in local repositories
search_results = searcher.search_flatpak(app_id, 'flathub')
# Store category results in database
self.update_database(category, db_path, app_id, search_results)
current_category += 1
# Update progress bar
progress = (current_category / total_categories) * 100
progress_bar.set_fraction(progress / 100)
# Force GTK to process events
while Gtk.events_pending():
Gtk.main_iteration_do(False)
dialog.destroy()
def create_panels(self):
# Create left panel with grouped categories
self.create_grouped_category_panel("Categories", self.category_groups)
# Create right panel
self.right_panel = self.create_applications_panel("Applications")
def create_grouped_category_panel(self, title, groups):
# Create scrollable area
scrolled_window = Gtk.ScrolledWindow()
scrolled_window.set_size_request(300, -1) # Set fixed width
scrolled_window.set_hexpand(False) # Don't expand horizontally
scrolled_window.set_vexpand(True) # Expand vertically
# Create container for categories
container = Gtk.Box(orientation=Gtk.Orientation.VERTICAL)
container.set_spacing(6)
container.set_border_width(6)
container.set_halign(Gtk.Align.FILL) # Fill horizontally
container.set_valign(Gtk.Align.START) # Align to top
# Dictionary to store buttons grouped by category
self.category_buttons = {}
# Add group headers and buttons
for group_name, categories in groups.items():
# Create a box for the header
header_box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL)
header_box.get_style_context().add_class("dark-header")
header_box.set_hexpand(True) # Make the box expand horizontally
# Create the label
group_header = Gtk.Label(label=group_name.upper())
group_header.get_style_context().add_class("title-2")
group_header.set_halign(Gtk.Align.START)
# Add the label to the box
header_box.pack_start(group_header, False, False, 0)
# Add the box to the container
container.pack_start(header_box, False, False, 0)
# Store buttons for this group
self.category_buttons[group_name] = []
# Add categories in the group
for category, display_title in categories.items():
# Create clickable button for each category
button = Gtk.ToggleButton(label=display_title)
button.get_style_context().remove_class("dark-header")
button.get_style_context().add_class("dark-category-button")
button.set_halign(Gtk.Align.START) # Left align button
button.set_hexpand(True) # Expand horizontally
button.connect("clicked", self.on_category_button_clicked, category, group_name)
# Store button in group
self.category_buttons[group_name].append(button)
container.pack_start(button, False, False, 2)
# Add container to scrolled window
scrolled_window.add(container)
# Pack the scrolled window directly into main box
self.main_box.pack_start(scrolled_window, False, False, 0)
def on_category_button_clicked(self, button, category, group):
# Uncheck all other buttons in the same group
for btn in self.category_buttons[group]:
if btn != button:
btn.set_active(False)
self.show_category_apps(category)
def create_applications_panel(self, title):
# Create right panel
self.right_panel = Gtk.Box(orientation=Gtk.Orientation.VERTICAL)
self.right_panel.set_size_request(-1, -1)
# Create scrollable area
scrolled_window = Gtk.ScrolledWindow()
scrolled_window.set_hexpand(True)
scrolled_window.set_vexpand(True)
# Create container for applications
self.right_container = Gtk.Box(orientation=Gtk.Orientation.VERTICAL)
self.right_container.set_spacing(6)
self.right_container.set_border_width(6)
scrolled_window.add(self.right_container)
self.main_box.pack_end(scrolled_window, True, True, 0)
return self.right_container
def check_internet(self):
"""Check if internet connection is available."""
try:
requests.head('https://flathub.org', timeout=3)
return True
except requests.ConnectionError:
return False
def fetch_flathub_category_apps(self, category):
"""Fetch applications from Flathub API for the specified category."""
try:
# URL encode the category to handle special characters
encoded_category = quote_plus(category)
# Determine the base URL based on category type
if category in self.category_groups['collections']:
url = f"https://flathub.org/api/v2/collection/{encoded_category}"
else:
url = f"https://flathub.org/api/v2/collection/category/{encoded_category}"
response = requests.get(url, timeout=10)
if response.status_code == 200:
return response.json()
else:
print(f"Failed to fetch apps: Status code {response.status_code}")
return None
except requests.RequestException as e:
print(f"Error fetching apps: {str(e)}")
return None
def update_collection_status(self, category, db_path, app_id):
"""Updates the trending status for a specific application."""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
category = category.replace('-', '_').lower()
# Use string formatting to properly identify the column name
query = f"""
UPDATE flathub
SET {category} = 1
WHERE id = '{app_id}'
"""
try:
cursor.execute(query)
conn.commit()
print(f"Collection {category} updated for app_id: {app_id}")
except sqlite3.Error as e:
print(f"Error updating database: {str(e)}")
finally:
conn.close()
def update_database(self, category, db_path, app_id, search_results):
"""Update database."""
# Process each app
for result in search_results:
app_data = result.get_details()
# Store app data
if category in self.category_groups['categories']:
store_app_data(db_path, 'flathub', app_data)
if category in self.category_groups['collections']:
self.update_collection_status(category, db_path, app_id)
def show_category_apps(self, category):
# Clear existing content
for child in self.right_container.get_children():
child.destroy()
# Now pull the new info from our local database
try:
conn = sqlite3.connect('flatshop_db')
cursor = conn.cursor()
if category in self.category_groups['categories']:
cursor.execute("""
SELECT id, name, summary, description, icon_path_64
FROM flathub
WHERE ? IN (SELECT value FROM json_each(categories))
ORDER BY name ASC
""", (category,))
elif category in self.category_groups['collections']:
category = category.replace('-', '_').lower()
# Use string formatting to properly identify the column name
query = f"""
SELECT id, name, summary, description, icon_path_64
FROM flathub
WHERE {category} = 1
ORDER BY name ASC
"""
print(query)
cursor.execute(query)
# Display each application
for id, name, summary, description, icon_path_64 in cursor.fetchall():
# Create application container
app_container = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL)
app_container.set_spacing(12)
app_container.set_margin_top(6)
app_container.set_margin_bottom(6)
# Add icon placeholder
icon_box = Gtk.Box()
icon_box.set_size_request(148, -1)
# Create and add the icon
icon = Gtk.Image.new_from_file(f"{icon_path_64}")
icon.set_size_request(48, 48) # Set a reasonable size for the icon
icon_box.pack_start(icon, True, True, 0) # Add icon to the box
# Create right side layout for text
right_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL)
right_box.set_spacing(4)
right_box.set_hexpand(True)
# Add title
title_label = Gtk.Label(label=name)
title_label.get_style_context().add_class("title-1")
title_label.set_halign(Gtk.Align.START)
title_label.set_hexpand(True)
# Add summary
desc_label = Gtk.Label(label=summary)
desc_label.set_halign(Gtk.Align.START)
desc_label.set_hexpand(True)
desc_label.set_line_wrap(True)
desc_label.set_line_wrap_mode(Gtk.WrapMode.WORD)
desc_label.get_style_context().add_class("dim-label")
# Add separator
separator = Gtk.Separator(orientation=Gtk.Orientation.HORIZONTAL)
# Add to container
right_box.pack_start(title_label, False, False, 0)
right_box.pack_start(desc_label, False, False, 0)
app_container.pack_start(icon_box, False, False, 0)
app_container.pack_start(right_box, True, True, 0)
self.right_container.pack_start(app_container, False, False, 0)
self.right_container.pack_start(separator, False, False, 0)
except sqlite3.Error as e:
error_label = Gtk.Label(label=f"Error loading applications: {str(e)}")
error_label.get_style_context().add_class("error-label")
error_label.set_halign(Gtk.Align.CENTER)
self.right_container.pack_start(error_label, False, False, 0)
finally:
conn.close()
self.right_container.show_all() # Show all widgets after adding them
def select_default_category(self):
# Select Trending by default
if 'collections' in self.category_buttons and self.category_buttons['collections']:
trending_button = self.category_buttons['collections'][0]
trending_button.set_active(True)
self.on_category_button_clicked(trending_button, 'trending', 'collections')
def create_repo_table(db_path, repo_name):
"""Create a table for storing app data from a specific repository."""
try:
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# Create table with all fields from AppStreamPackage
cursor.execute(f"""
CREATE TABLE IF NOT EXISTS {repo_name} (
id TEXT PRIMARY KEY,
name TEXT,
summary TEXT,
description TEXT,
version TEXT,
icon_url TEXT,
icon_path_128 TEXT,
icon_path_64 TEXT,
icon_filename TEXT,
developer TEXT,
categories TEXT,
bundle_id TEXT,
repo_name TEXT,
match_type TEXT,
urls TEXT,
trending INTEGER DEFAULT 0,
popular INTEGER DEFAULT 0,
recently_added INTEGER DEFAULT 0,
recently_updated INTEGER DEFAULT 0,
FOREIGN KEY (id) REFERENCES applications (app_id)
)
""")
conn.commit()
return True
except sqlite3.Error as e:
print(f"Error creating table: {str(e)}")
return False
finally:
conn.close()
def store_app_data(db_path, repo_name, app_data):
"""Store app data in the SQLite database."""
try:
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# Convert URLs dictionary to JSON string for storage
urls_json = json.dumps(app_data['urls'])
categories_json = json.dumps(app_data['categories'])
# Insert data into the repository table
cursor.execute(f"""
INSERT OR REPLACE INTO {repo_name} (
id, name, summary, description, version,
icon_url, icon_path_128, icon_path_64,
icon_filename, developer, categories,
bundle_id, repo_name, match_type, urls,
trending, popular, recently_added, recently_updated
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
app_data['id'],
app_data['name'],
app_data['summary'],
app_data['description'],
app_data['version'],
app_data['icon_url'],
app_data['icon_path_128'],
app_data['icon_path_64'],
app_data['icon_filename'],
app_data['developer'],
categories_json,
app_data['bundle_id'],
repo_name,
app_data['match_type'],
urls_json,
0,
0,
0,
0
))
conn.commit()
return True
except sqlite3.Error as e:
print(f"Error storing app data: {str(e)}")
return False
finally:
conn.close()
def main():
app = MainWindow()
app.connect("destroy", Gtk.main_quit)
app.show_all()
Gtk.main()
if __name__ == "__main__":
main()