flatpost/libflatpak_query.py
2025-03-28 01:29:03 -06:00

1077 lines
40 KiB
Python
Executable file

#!/usr/bin/env python3
### Documentation largely taken from:
###
### 1. https://lazka.github.io/pgi-docs/Flatpak-1.0
### 2. https://flathub.org/api/v2/docs#/
###
### Classes AppStreamPackage and AppStreamSearcher extended from original by Tim Tim Lauridsen at:
###
### https://github.com/timlau/yumex-ng/blob/main/yumex/backend/flatpak/search.py
# Original GPL v3 Code Copyright:
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# Copyright (C) 2024 Tim Lauridsen
#
# Modifications copyright notice
# Copyright (C) 2025 Thomas Crider
#
# Original code has been completely removed except
# AppStreamPackage and AppStreamSearcher classes
# which have been modified and extended.
import gi
gi.require_version("AppStream", "1.0")
gi.require_version("Flatpak", "1.0")
from gi.repository import Flatpak, GLib, Gio, AppStream
from pathlib import Path
import logging
from enum import IntEnum
import argparse
import requests
from urllib.parse import quote_plus, urlparse
import tempfile
import os
import sys
import json
import time
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class Match(IntEnum):
NAME = 1
ID = 2
SUMMARY = 3
NONE = 4
class AppStreamPackage:
def __init__(self, comp: AppStream.Component, remote: Flatpak.Remote) -> None:
self.component: AppStream.Component = comp
self.remote: Flatpak.Remote = remote
self.repo_name: str = remote.get_name()
bundle: AppStream.Bundle = comp.get_bundle(AppStream.BundleKind.FLATPAK)
self.flatpak_bundle: str = bundle.get_id()
self.match = Match.NONE
# Get icon and description
self.icon_url = self._get_icon_url()
self.icon_path_128 = self._get_icon_cache_path("128x128")
self.icon_path_64 = self._get_icon_cache_path("64x64")
self.icon_filename = self._get_icon_filename()
self.description = self.component.get_description()
# Get URLs from the component
self.urls = self._get_urls()
self.developer = self.component.get_developer().get_name()
self.categories = self._get_categories()
@property
def id(self) -> str:
return self.component.get_id()
@property
def name(self) -> str:
return self.component.get_name()
@property
def summary(self) -> str:
return self.component.get_summary()
@property
def version(self) -> str|None:
releases = self.component.get_releases_plain()
if releases:
release = releases.index_safe(0)
if release:
version = release.get_version()
return version
return None
@property
def kind(self) -> str:
return str(self.component.get_kind())
def _get_icon_url(self) -> str:
"""Get the remote icon URL from the component"""
icons = self.component.get_icons()
# Find the first REMOTE icon
remote_icon = next((icon for icon in icons if icon.get_kind() == AppStream.IconKind.REMOTE), None)
return remote_icon.get_url() if remote_icon else ""
def _get_icon_filename(self) -> str:
"""Get the cached icon filename from the component"""
icons = self.component.get_icons()
# Find the first CACHED icon
cached_icon = next((icon for icon in icons if icon.get_kind() == AppStream.IconKind.CACHED), None)
return cached_icon.get_filename() if cached_icon else ""
def _get_icon_cache_path(self, size: str) -> str:
# Appstream icon cache path for the flatpak repo queried
icon_cache_path = Path(self.remote.get_appstream_dir().get_path() + "/icons/flatpak/" + size + "/")
return str(icon_cache_path)
def _get_urls(self) -> dict:
"""Get URLs from the component"""
urls = {
'donation': self._get_url('donation'),
'homepage': self._get_url('homepage'),
'bugtracker': self._get_url('bugtracker')
}
return urls
def _get_url(self, url_kind: str) -> str:
"""Helper method to get a specific URL type"""
# Convert string to AppStream.UrlKind enum
url_kind_enum = getattr(AppStream.UrlKind, url_kind.upper())
url = self.component.get_url(url_kind_enum)
if url:
return url
return ""
def _get_categories(self) -> list:
categories_fetch = self.component.get_categories()
categories = []
for category in categories_fetch:
categories.append(category.lower())
return categories
def search(self, keyword: str) -> Match:
"""Search for keyword in package details"""
if keyword in self.name.lower():
return Match.NAME
elif keyword in self.id.lower():
return Match.ID
elif keyword in self.summary.lower():
return Match.SUMMARY
else:
return Match.NONE
def __str__(self) -> str:
return f"{self.name} - {self.summary} ({self.flatpak_bundle})"
def get_details(self) -> dict:
"""Get all package details including icon and description"""
return {
"name": self.name,
"id": self.id,
"kind": self.kind,
"summary": self.summary,
"description": self.description,
"version": self.version,
"icon_url": self.icon_url,
"icon_path_128": self.icon_path_128,
"icon_path_64": self.icon_path_64,
"icon_filename": self.icon_filename,
"urls": self.urls,
"developer": self.developer,
#"architectures": self.architectures,
"categories": self.categories,
"bundle_id": self.flatpak_bundle,
"match_type": self.match.name,
"repo": self.repo_name
}
class AppstreamSearcher:
"""Flatpak AppStream Package seacher"""
def __init__(self, refresh=False) -> None:
self.remotes: dict[str, list[AppStreamPackage]] = {}
self.refresh_progress = 0
self.refresh = refresh
# Define category groups and their titles
self.category_groups = {
'system': {
'installed': 'Installed',
'updates': 'Updates',
'repositories': 'Repositories'
},
'collections': {
'trending': 'Trending',
'popular': 'Popular',
'recently-added': 'New',
'recently-updated': 'Updated'
},
'categories': {
'office': 'Productivity',
'graphics': 'Graphics & Photography',
'audiovideo': 'Audio & Video',
'education': 'Education',
'network': 'Networking',
'game': 'Games',
'development': 'Developer Tools',
'science': 'Science',
'system': 'System',
'utility': 'Utilities'
}
}
def add_installation(self, inst: Flatpak.Installation):
"""Add enabled flatpak repositories from Flatpak.Installation"""
remotes = inst.list_remotes()
for remote in remotes:
if not remote.get_disabled():
self.add_remote(remote, inst)
def add_remote(self, remote: Flatpak.Remote, inst: Flatpak.Installation):
"""Add packages for a given Flatpak.Remote"""
remote_name = remote.get_name()
if remote_name not in self.remotes:
self.remotes[remote_name] = self._load_appstream_metadata(remote, inst)
def _load_appstream_metadata(self, remote: Flatpak.Remote, inst: Flatpak.Installation) -> list[AppStreamPackage]:
"""load AppStrean metadata and create AppStreamPackage objects"""
packages = []
metadata = AppStream.Metadata.new()
metadata.set_format_style(AppStream.FormatStyle.CATALOG)
if self.refresh:
if remote.get_name() == "flathub" or remote.get_name() == "flathub-beta":
remote.set_gpg_verify(True)
inst.modify_remote(remote, None)
inst.update_appstream_full_sync(remote.get_name(), None, None, True)
appstream_file = Path(remote.get_appstream_dir().get_path() + "/appstream.xml.gz")
if not appstream_file.exists() and check_internet():
try:
if remote.get_name() == "flathub" or remote.get_name() == "flathub-beta":
remote.set_gpg_verify(True)
inst.modify_remote(remote, None)
inst.update_appstream_full_sync(remote.get_name(), None, None, True)
except GLib.Error as e:
logger.error(f"Failed to update AppStream metadata: {str(e)}")
if appstream_file.exists():
metadata.parse_file(Gio.File.new_for_path(appstream_file.as_posix()), AppStream.FormatKind.XML)
components: AppStream.ComponentBox = metadata.get_components()
i = 0
for i in range(components.get_size()):
component = components.index_safe(i)
#if component.get_kind() == AppStream.ComponentKind.DESKTOP_APP:
packages.append(AppStreamPackage(component, remote))
return packages
else:
logger.debug(f"AppStream file not found: {appstream_file}")
return []
def search_flatpak_repo(self, keyword: str, repo_name: str) -> list[AppStreamPackage]:
search_results = []
packages = self.remotes[repo_name]
found = None
for package in packages:
# Try matching exact ID first
if keyword is package.id:
found = package
break
# Next try matching exact name
elif keyword.lower() is package.name.lower():
found = package
break
# Try matching case insensitive ID next
elif keyword.lower() is package.id.lower():
found = package
break
# General keyword search
elif keyword.lower() in str(package).lower():
found = package
break
if found:
search_results.append(found)
return search_results
def search_flatpak(self, keyword: str, repo_name=None) -> list[AppStreamPackage]:
"""Search packages matching a keyword"""
search_results = []
keyword = keyword
if not repo_name:
for remote_name in self.remotes.keys():
search_results.extend(self.search_flatpak_repo(keyword, remote_name))
else:
if repo_name in self.remotes.keys():
search_results.extend(self.search_flatpak_repo(keyword, repo_name))
return search_results
def get_all_apps(self, repo_name=None) -> list[AppStreamPackage]:
"""Get all available apps from specified or all repositories"""
all_packages = []
if repo_name:
if repo_name in self.remotes:
all_packages = self.remotes[repo_name]
else:
for remote_name in self.remotes.keys():
all_packages.extend(self.remotes[remote_name])
return all_packages
def get_categories_summary(self, repo_name=None) -> dict:
"""Get a summary of all apps grouped by category"""
apps = self.get_all_apps(repo_name)
categories = {}
for app in apps:
for category in app.categories:
if category not in categories:
categories[category] = []
categories[category].append(app)
return categories
def get_installed_apps(self, system=False) -> list[tuple[str, str, str]]:
"""Get a list of all installed Flatpak applications with their repository source"""
installed_refs = []
installation = get_installation(system)
def process_installed_refs(inst: Flatpak.Installation, system=False):
for ref in inst.list_installed_refs():
app_id = ref.get_name()
remote_name = ref.get_origin()
if system is False:
installed_refs.append((app_id, remote_name, "user"))
else:
installed_refs.append((app_id, remote_name, "system"))
# Process both system-wide and user installations
process_installed_refs(installation, system)
# Remove duplicates while maintaining order
seen = set()
unique_installed = [(ref, repo, repo_type) for ref, repo, repo_type in installed_refs
if not (ref in seen or seen.add(ref))]
return unique_installed
def check_updates(self, system=False) -> list[tuple[str, str, str]]:
"""Check for available updates for installed Flatpak applications"""
updates = []
installation = get_installation(system)
def check_updates_for_install(inst: Flatpak.Installation, system=False):
for ref in inst.list_installed_refs_for_update(None):
app_id = ref.get_name()
# Get remote name from the installation
remote_name = ref.get_origin()
if system is False:
updates.append((app_id, remote_name, "user"))
else:
updates.append((app_id, remote_name, "system"))
# Process both system-wide and user installations
check_updates_for_install(installation, system)
return updates
def fetch_flathub_category_apps(self, category):
"""Fetch applications from Flathub API for the specified category."""
try:
# URL encode the category to handle special characters
encoded_category = quote_plus(category)
# Determine the base URL based on category type
if category in self.category_groups['collections']:
url = f"https://flathub.org/api/v2/collection/{encoded_category}"
else:
url = f"https://flathub.org/api/v2/collection/category/{encoded_category}"
response = requests.get(url, timeout=10)
if response.status_code == 200:
data = response.json()
# If this is a collections category, save it to our collections database
if category in self.category_groups['collections']:
if not hasattr(self, 'collections_db'):
self.collections_db = []
self.collections_db.append({
'category': category,
'data': data
})
return data
else:
print(f"Failed to fetch apps: Status code {response.status_code}")
return None
except requests.RequestException as e:
print(f"Error fetching apps: {str(e)}")
return None
def save_collections_data(self, filename='collections_data.json'):
"""Save all collected collections data to a JSON file."""
if not hasattr(self, 'collections_db') or not self.collections_db:
return
try:
with open(filename, 'w', encoding='utf-8') as f:
json.dump(self.collections_db, f, indent=2, ensure_ascii=False)
except IOError as e:
print(f"Error saving collections data: {str(e)}")
def update_collection_results(self, new_collection_results):
"""Update search results by replacing existing items and adding new ones."""
# Create a set of existing app_ids for efficient lookup
existing_app_ids = {app.id for app in self.collection_results}
# Create a list to store the updated results
updated_results = []
# First add all existing results
updated_results.extend(self.collection_results)
# Add new results, replacing any existing ones
for new_result in new_collection_results:
app_id = new_result.id
if app_id in existing_app_ids:
# Replace existing result
for i, existing in enumerate(updated_results):
if existing.id == app_id:
updated_results[i] = new_result
break
else:
# Add new result
updated_results.append(new_result)
self.collection_results = updated_results
def refresh_local(self, system=False):
# make sure to reset these to empty before refreshing.
self.installed_results = [] # Initialize empty list
self.updates_results = [] # Initialize empty list
total_categories = sum(len(categories) for categories in self.category_groups.values())
current_category = 0
# Search for each app in local repositories
searcher = get_reposearcher(system)
search_result = []
for group_name, categories in self.category_groups.items():
# Process categories one at a time to keep GUI responsive
for category, title in categories.items():
if "installed" in category:
installed_apps = searcher.get_installed_apps(system)
for app_id, repo_name, repo_type in installed_apps:
if repo_name:
search_result = searcher.search_flatpak(app_id, repo_name)
self.installed_results.extend(search_result)
elif "updates" in category:
updates = searcher.check_updates(system)
for repo_name, app_id, repo_type in updates:
if repo_name:
search_result = searcher.search_flatpak(app_id, repo_name)
self.updates_results.extend(search_result)
# Update progress bar
self.refresh_progress = (current_category / total_categories) * 100
# make sure to reset these to empty before refreshing.
return self.installed_results, self.updates_results
def retrieve_metadata(self, system=False):
"""Retrieve and refresh metadata for Flatpak repositories."""
self._initialize_metadata()
if not check_internet():
return self._handle_offline_mode()
searcher = get_reposearcher(system, True)
self.all_apps = searcher.get_all_apps()
return self._process_categories(searcher, system)
def _initialize_metadata(self):
"""Initialize empty lists for metadata storage."""
self.category_results = []
self.collection_results = []
self.installed_results = []
self.updates_results = []
self.all_apps = []
def _handle_offline_mode(self):
"""Handle metadata retrieval when offline."""
json_path = "collections_data.json"
try:
with open(json_path, 'r', encoding='utf-8') as f:
collections_data = json.load(f)
return self._process_offline_data(collections_data)
except (IOError, json.JSONDecodeError) as e:
logger.error(f"Error loading offline data: {str(e)}")
return None, [], [], [], []
def _process_offline_data(self, collections_data):
"""Process cached collections data when offline."""
for collection in collections_data:
category = collection['category']
if category in self.category_groups['collections']:
apps = [app['app_id'] for app in collection['data'].get('hits', [])]
for app_id in apps:
search_result = self.search_flatpak(app_id, 'flathub')
self.collection_results.extend(search_result)
return self._get_current_results()
def _process_categories(self, searcher, system=False):
"""Process categories and retrieve metadata."""
total_categories = sum(len(categories) for categories in self.category_groups.values())
current_category = 0
for group_name, categories in self.category_groups.items():
for category, title in categories.items():
if category not in self.category_groups['system']:
self._process_category(searcher, category, current_category, total_categories)
else:
self._process_system_category(searcher, category, system)
current_category += 1
return self._get_current_results()
def _process_category(self, searcher, category, current_category, total_categories):
"""Process a single category and retrieve its metadata."""
json_path = "collections_data.json"
try:
with open(json_path, 'r', encoding='utf-8') as f:
collections_data = json.load(f)
self._update_from_collections(collections_data, category)
except (IOError, json.JSONDecodeError) as e:
logger.error(f"Error loading collections data: {str(e)}")
if self._should_refresh_category(category):
self._refresh_category_data(searcher, category)
self.refresh_progress = (current_category / total_categories) * 100
def _update_from_collections(self, collections_data, category):
"""Update results from cached collections data."""
for collection in collections_data:
if collection['category'] == category:
apps = [app['app_id'] for app in collection['data'].get('hits', [])]
for app_id in apps:
search_result = self.search_flatpak(app_id, 'flathub')
self.collection_results.extend(search_result)
def _should_refresh_category(self, category):
"""Check if category data needs refresh."""
json_path = "collections_data.json"
try:
mod_time = os.path.getmtime(json_path)
return (time.time() - mod_time) > 24 * 3600
except OSError:
return True
def _refresh_category_data(self, searcher, category):
"""Refresh category data from Flathub API."""
try:
api_data = self.fetch_flathub_category_apps(category)
if api_data:
apps = api_data['hits']
for app in apps:
app_id = app['app_id']
search_result = searcher.search_flatpak(app_id, 'flathub')
if category in self.category_groups['collections']:
self.update_collection_results(search_result)
else:
self.category_results.extend(search_result)
except requests.RequestException as e:
logger.error(f"Error refreshing category {category}: {str(e)}")
def _process_system_category(self, searcher, category, system=False):
"""Process system-related categories."""
if "installed" in category:
installed_apps = searcher.get_installed_apps(system)
for app_id, repo_name, repo_type in installed_apps:
if repo_name:
search_result = searcher.search_flatpak(app_id, repo_name)
self.installed_results.extend(search_result)
elif "updates" in category:
updates = searcher.check_updates(system)
for app_id, repo_name, repo_type in updates:
if repo_name:
search_result = searcher.search_flatpak(app_id, repo_name)
self.updates_results.extend(search_result)
def _get_current_results(self):
"""Return current metadata results."""
return (self.category_results,
self.collection_results,
self.installed_results,
self.updates_results,
self.all_apps)
def install_flatpak(app: AppStreamPackage, repo_name=None, system=False) -> tuple[bool, str]:
"""
Install a Flatpak package.
Args:
app (AppStreamPackage): The package to install.
repo_name (str): Optional repository name to use for installation
system (Optional[bool]): Whether to operate on user or system installation
Returns:
tuple[bool, str]: (success, message)
"""
if not repo_name:
repo_name = "flathub"
installation = get_installation(system)
transaction = Flatpak.Transaction.new_for_installation(installation)
# Add the install operation
transaction.add_install(repo_name, app.flatpak_bundle, None)
# Run the transaction
try:
transaction.run()
except GLib.Error as e:
return False, f"Installation failed: {e}"
return True, f"Successfully installed {app.id}"
def remove_flatpak(app: AppStreamPackage, repo_name=None, system=False) -> tuple[bool, str]:
"""
Remove a Flatpak package using transactions.
Args:
app (AppStreamPackage): The package to install.
system (Optional[bool]): Whether to operate on user or system installation
Returns:
Tuple[bool, str]: (success, message)
"""
if not repo_name:
repo_name = "flathub"
# Get the appropriate installation based on user parameter
installation = get_installation(system)
# Create a new transaction for removal
transaction = Flatpak.Transaction.new_for_installation(installation)
transaction.add_uninstall(app.flatpak_bundle)
# Run the transaction
try:
transaction.run()
except GLib.Error as e:
return False, f"Failed to remove {app.id}: {e}"
return True, f"Successfully removed {app.id}"
def update_flatpak(app: AppStreamPackage, repo_name=None, system=False) -> tuple[bool, str]:
"""
Remove a Flatpak package using transactions.
Args:
app (AppStreamPackage): The package to install.
system (Optional[bool]): Whether to operate on user or system installation
Returns:
Tuple[bool, str]: (success, message)
"""
if not repo_name:
repo_name = "flathub"
# Get the appropriate installation based on user parameter
installation = get_installation(system)
# Create a new transaction for removal
transaction = Flatpak.Transaction.new_for_installation(installation)
transaction.add_update(app.flatpak_bundle)
# Run the transaction
try:
transaction.run()
except GLib.Error as e:
return False, f"Failed to update {app.id}: {e}"
return True, f"Successfully updated {app.id}"
def get_installation(system=False):
if system is False:
installation = Flatpak.Installation.new_user()
else:
installation = Flatpak.Installation.new_system()
return installation
def get_reposearcher(system=False, refresh=False):
installation = get_installation(system)
searcher = AppstreamSearcher(refresh)
searcher.add_installation(installation)
return searcher
def check_internet():
"""Check if internet connection is available."""
try:
requests.head('https://flathub.org', timeout=3)
return True
except requests.ConnectionError:
return False
def repotoggle(repo, toggle=True, system=False):
"""
Enable or disable a Flatpak repository
Args:
repo (str): Name of the repository to toggle
enable (toggle): True to enable, False to disable
Returns:
tuple: (success, error_message)
"""
if not repo:
return False, "Repository name cannot be empty"
installation = get_installation(system)
try:
remote = installation.get_remote_by_name(repo)
if not remote:
return False, f"Repository '{repo}' not found."
remote.set_disabled(not toggle)
# Modify the remote's disabled status
success = installation.modify_remote(
remote,
None
)
if success:
if toggle:
message = f"Successfully enabled {repo}."
else:
message = f"Successfully disabled {repo}."
return True, message
except GLib.GError as e:
return False, f"Failed to toggle repository: {str(e)}"
return False, "Operation failed"
def repolist(system=False):
installation = get_installation(system)
repos = installation.list_remotes()
return repos
def repodelete(repo, system=False):
installation = get_installation(system)
installation.remove_remote(repo)
def repoadd(repofile, system=False):
"""Add a new repository using a .flatpakrepo file"""
# Get existing repositories
installation = get_installation(system)
existing_repos = installation.list_remotes()
if not repofile.endswith('.flatpakrepo'):
return False, "Repository file path or URL must end with .flatpakrepo extension."
if repofile_is_url(repofile):
try:
local_path = download_repo(repofile)
repofile = local_path
print(f"\nRepository added successfully: {repofile}")
except:
return False, f"Repository file '{repofile}' could not be downloaded."
if not os.path.exists(repofile):
return False, f"Repository file '{repofile}' does not exist."
# Get repository title from file name
title = os.path.basename(repofile).replace('.flatpakrepo', '')
# Check for duplicate title (case insensitive)
existing_titles = [repo.get_name().casefold() for repo in existing_repos]
if title.casefold() in existing_titles:
return False, "A repository with this title already exists."
# Read the repository file
try:
with open(repofile, 'rb') as f:
repo_data = f.read()
except IOError as e:
return False, f"Failed to read repository file: {str(e)}"
# Convert the data to GLib.Bytes
repo_bytes = GLib.Bytes.new(repo_data)
# Create a new remote from the repository file
try:
remote = Flatpak.Remote.new_from_file(title, repo_bytes)
# Get URLs and normalize them by removing trailing slashes
new_url = remote.get_url().rstrip('/')
existing_urls = [repo.get_url().rstrip('/') for repo in existing_repos]
# Check if URL already exists
if new_url in existing_urls:
return False, f"A repository with URL '{new_url}' already exists."
user = "user"
if system:
user = "system"
remote.set_gpg_verify(True)
installation.add_remote(remote, True, None)
except GLib.GError as e:
return False, f"Failed to add repository: {str(e)}"
return True, f"{remote.get_name()} repository successfully added for {user} installation."
def repofile_is_url(string):
"""Check if a string is a valid URL"""
try:
result = urlparse(string)
return all([result.scheme, result.netloc])
except:
return False
def download_repo(url):
"""Download a repository file from URL to /tmp/"""
try:
# Create a deterministic filename based on the URL
url_path = urlparse(url).path
filename = os.path.basename(url_path) or 'repo'
tmp_path = Path(tempfile.gettempdir()) / f"{filename}"
# Download the file
with requests.get(url, stream=True) as response:
response.raise_for_status()
# Write the file in chunks, overwriting if it exists
with open(tmp_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
return str(tmp_path)
except requests.RequestException as e:
raise argparse.ArgumentTypeError(f"Failed to download repository file: {str(e)}")
except IOError as e:
raise argparse.ArgumentTypeError(f"Failed to save repository file: {str(e)}")
def main():
parser = argparse.ArgumentParser(description='Search Flatpak packages')
parser.add_argument('--id', help='Application ID to search for')
parser.add_argument('--repo', help='Filter results to specific repository')
parser.add_argument('--list-all', action='store_true', help='List all available apps')
parser.add_argument('--categories', action='store_true', help='Show apps grouped by category')
parser.add_argument('--list-installed', action='store_true',
help='List all installed Flatpak applications')
parser.add_argument('--check-updates', action='store_true',
help='Check for available updates')
parser.add_argument('--list-repos', action='store_true',
help='List all configured Flatpak repositories')
parser.add_argument('--add-repo', type=str, metavar='REPO_FILE',
help='Add a new repository from a .flatpakrepo file')
parser.add_argument('--remove-repo', type=str, metavar='REPO_NAME',
help='Remove a Flatpak repository')
parser.add_argument('--toggle-repo', type=str,
metavar=('ENABLE/DISABLE'),
help='Enable or disable a repository')
parser.add_argument('--install', type=str, metavar='APP_ID',
help='Install a Flatpak package')
parser.add_argument('--remove', type=str, metavar='APP_ID',
help='Remove a Flatpak package')
parser.add_argument('--update', type=str, metavar='APP_ID',
help='Update a Flatpak package')
parser.add_argument('--system', action='store_true', help='Install as system instead of user')
parser.add_argument('--refresh', action='store_true', help='Install as system instead of user')
parser.add_argument('--refresh-local', action='store_true', help='Install as system instead of user')
args = parser.parse_args()
# Handle repository operations
if args.toggle_repo:
handle_repo_toggle(args)
return
if args.list_repos:
handle_list_repos(args)
return
if args.add_repo:
handle_add_repo(args)
return
if args.remove_repo:
handle_remove_repo(args)
return
# Handle package operations
searcher = get_reposearcher(args.system)
if args.install:
handle_install(args, searcher)
return
if args.remove:
handle_remove(args, searcher)
return
if args.update:
handle_update(args, searcher)
return
# Handle information operations
if args.list_installed:
handle_list_installed(args, searcher)
return
if args.check_updates:
handle_check_updates(args, searcher)
return
if args.list_all:
handle_list_all(args, searcher)
return
if args.categories:
handle_categories(args, searcher)
return
if args.id:
handle_search(args, searcher)
return
print("Missing options. Use -h for help.")
def handle_repo_toggle(args):
repo_name = args.repo
if not repo_name:
print("Error: must specify a repo.")
sys.exit(1)
get_status = args.toggle_repo.lower() in ['true', 'enable']
try:
success, message = repotoggle(repo_name, get_status, args.system)
print(f"{message}")
except GLib.Error as e:
print(f"{str(e)}")
def handle_list_repos(args):
repos = repolist(args.system)
print("\nConfigured Repositories:")
for repo in repos:
print(f"- {repo.get_name()} ({repo.get_url()})")
def handle_add_repo(args):
try:
success, message = repoadd(args.add_repo, args.system)
print(f"{message}")
except GLib.Error as e:
print(f"{str(e)}")
def handle_remove_repo(args):
repodelete(args.remove_repo, args.system)
print(f"\nRepository removed successfully: {args.remove_repo}")
def handle_install(args, searcher):
packagelist = searcher.search_flatpak(args.install, args.repo)
result_message = ""
for package in packagelist:
try:
success, message = install_flatpak(package, args.repo, args.system)
result_message = f"{message}"
break
except GLib.Error as e:
result_message = f"Installation of {args.install} failed: {str(e)}"
pass
print(result_message)
def handle_remove(args, searcher):
packagelist = searcher.search_flatpak(args.remove, args.repo)
result_message = ""
for package in packagelist:
try:
success, message = remove_flatpak(package, args.repo, args.system)
result_message = f"{message}"
break
except GLib.Error as e:
result_message = f"Removal of {args.remove} failed: {str(e)}"
pass
print(result_message)
def handle_update(args, searcher):
packagelist = searcher.search_flatpak(args.update, args.repo)
result_message = ""
for package in packagelist:
try:
success, message = update_flatpak(package, args.repo, args.system)
result_message = f"{message}"
break
except GLib.Error as e:
result_message = f"Update of {args.update} failed: {str(e)}"
pass
print(result_message)
def handle_list_installed(args, searcher):
installed_apps = searcher.get_installed_apps(args.system)
print(f"\nInstalled Flatpak Applications ({len(installed_apps)}):")
for app_id, repo_name, repo_type in installed_apps:
print(f"{app_id} (Repository: {repo_name}, Installation: {repo_type})")
def handle_check_updates(args, searcher):
updates = searcher.check_updates(args.system)
print(f"\nAvailable Updates ({len(updates)}):")
for repo_name, app_id, repo_type in updates:
print(f"{app_id} (Repository: {repo_name}, Installation: {repo_type})")
def handle_list_all(args, searcher):
apps = searcher.get_all_apps(args.repo)
for app in apps:
details = app.get_details()
print(f"Name: {details['name']}")
print(f"Categories: {', '.join(details['categories'])}")
print("-" * 50)
def handle_categories(args, searcher):
categories = searcher.get_categories_summary(args.repo)
for category, apps in categories.items():
print(f"\n{category.upper()}:")
for app in apps:
print(f" - {app.name} ({app.id})")
def handle_search(args, searcher):
if args.repo:
search_results = searcher.search_flatpak(args.id, args.repo)
else:
search_results = searcher.search_flatpak(args.id)
if search_results:
for package in search_results:
details = package.get_details()
print(f"Name: {details['name']}")
print(f"ID: {details['id']}")
print(f"Kind: {details['kind']}")
print(f"Summary: {details['summary']}")
print(f"Description: {details['description']}")
print(f"Version: {details['version']}")
print(f"Icon URL: {details['icon_url']}")
print(f"Icon PATH 128x128: {details['icon_path_128']}")
print(f"Icon PATH 64x64: {details['icon_path_64']}")
print(f"Icon FILE: {details['icon_filename']}")
print(f"Developer: {details['developer']}")
print(f"Categories: {details['categories']}")
urls = details['urls']
print(f"Donation URL: {urls['donation']}")
print(f"Homepage URL: {urls['homepage']}")
print(f"Bug Tracker URL: {urls['bugtracker']}")
print(f"Bundle ID: {details['bundle_id']}")
print(f"Match Type: {details['match_type']}")
print(f"Repo: {details['repo']}")
print("-" * 50)
if __name__ == "__main__":
main()