flatpost/fp_turbo.py
2025-04-04 11:09:21 -06:00

2770 lines
104 KiB
Python
Executable file

#!/usr/bin/env python3
### Documentation largely taken from:
###
### 1. https://lazka.github.io/pgi-docs/Flatpak-1.0
### 2. https://flathub.org/api/v2/docs#/
###
### Classes AppStreamPackage and AppStreamSearcher extended from original by Tim Tim Lauridsen at:
###
### https://github.com/timlau/yumex-ng/blob/main/yumex/backend/flatpak/search.py
# Original GPL v3 Code Copyright:
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# Copyright (C) 2024 Tim Lauridsen
#
# Modifications copyright notice
# Copyright (C) 2025 Thomas Crider
#
# Original code has been completely removed except
# AppStreamPackage and AppStreamSearcher classes
# which have been modified and extended.
import gi
gi.require_version("AppStream", "1.0")
gi.require_version("Flatpak", "1.0")
from gi.repository import Flatpak, GLib, Gio, AppStream
from pathlib import Path
import logging
from enum import IntEnum
import argparse
import requests
from urllib.parse import quote_plus, urlparse
import tempfile
import os
import sys
import json
import time
import dbus
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class Match(IntEnum):
NAME = 1
ID = 2
SUMMARY = 3
NONE = 4
class AppStreamComponentKind(IntEnum):
"""AppStream Component Kind enumeration."""
UNKNOWN = 0
"""Type invalid or not known."""
GENERIC = 1
"""A generic (= without specialized type) component."""
DESKTOP_APP = 2
"""An application with a .desktop-file."""
CONSOLE_APP = 3
"""A console application."""
WEB_APP = 4
"""A web application."""
SERVICE = 5
"""A system service launched by the init system."""
ADDON = 6
"""An extension of existing software, which does not run standalone."""
RUNTIME = 7
"""An application runtime platform."""
FONT = 8
"""A font."""
CODEC = 9
"""A multimedia codec."""
INPUT_METHOD = 10
"""An input-method provider."""
OPERATING_SYSTEM = 11
"""A computer operating system."""
FIRMWARE = 12
"""Firmware."""
DRIVER = 13
"""A driver."""
LOCALIZATION = 14
"""Software localization (usually l10n resources)."""
REPOSITORY = 15
"""A remote software or data source."""
ICON_THEME = 16
"""An icon theme following the XDG specification."""
class AppStreamPackage:
def __init__(self, comp: AppStream.Component, remote: Flatpak.Remote) -> None:
self.component: AppStream.Component = comp
self.remote: Flatpak.Remote = remote
self.repo_name: str = remote.get_name()
bundle: AppStream.Bundle = comp.get_bundle(AppStream.BundleKind.FLATPAK)
self.flatpak_bundle: str = bundle.get_id()
self.match = Match.NONE
# Get icon and description
self.icon_url = self._get_icon_url()
self.icon_path_128 = self._get_icon_cache_path("128x128")
self.icon_path_64 = self._get_icon_cache_path("64x64")
self.icon_filename = self._get_icon_filename()
self.description = self.component.get_description()
# Get URLs from the component
self.urls = self._get_urls()
self.developer = self.component.get_developer().get_name()
self.categories = self._get_categories()
@property
def id(self) -> str:
return self.component.get_id()
@property
def name(self) -> str:
return self.component.get_name()
@property
def summary(self) -> str:
return self.component.get_summary()
@property
def version(self) -> str|None:
releases = self.component.get_releases_plain()
if releases:
release = releases.index_safe(0)
if release:
version = release.get_version()
return version
return None
@property
def kind(self):
kind = self.component.get_kind()
kind_str = str(kind)
for member in AppStreamComponentKind:
if member.name in kind_str:
return member.name
def _get_icon_url(self) -> str:
"""Get the remote icon URL from the component"""
icons = self.component.get_icons()
# Find the first REMOTE icon
remote_icon = next((icon for icon in icons if icon.get_kind() == AppStream.IconKind.REMOTE), None)
return remote_icon.get_url() if remote_icon else ""
def _get_icon_filename(self) -> str:
"""Get the cached icon filename from the component"""
icons = self.component.get_icons()
# Find the first CACHED icon
cached_icon = next((icon for icon in icons if icon.get_kind() == AppStream.IconKind.CACHED), None)
return cached_icon.get_filename() if cached_icon else ""
def _get_icon_cache_path(self, size: str) -> str:
# Appstream icon cache path for the flatpak repo queried
icon_cache_path = Path(self.remote.get_appstream_dir().get_path() + "/icons/flatpak/" + size + "/")
return str(icon_cache_path)
def _get_urls(self) -> dict:
"""Get URLs from the component"""
urls = {
'donation': self._get_url('donation'),
'homepage': self._get_url('homepage'),
'bugtracker': self._get_url('bugtracker')
}
return urls
def _get_url(self, url_kind: str) -> str:
"""Helper method to get a specific URL type"""
# Convert string to AppStream.UrlKind enum
url_kind_enum = getattr(AppStream.UrlKind, url_kind.upper())
url = self.component.get_url(url_kind_enum)
if url:
return url
return ""
def _get_categories(self) -> list:
categories_fetch = self.component.get_categories()
categories = []
for category in categories_fetch:
categories.append(category.lower())
return categories
def search(self, keyword: str) -> Match:
"""Search for keyword in package details"""
if keyword in self.name.lower():
return Match.NAME
elif keyword in self.id.lower():
return Match.ID
elif keyword in self.summary.lower():
return Match.SUMMARY
else:
return Match.NONE
def __str__(self) -> str:
return f"{self.name} - {self.summary} ({self.flatpak_bundle})"
def get_details(self) -> dict:
"""Get all package details including icon and description"""
return {
"name": self.name,
"id": self.id,
"kind": self.kind,
"summary": self.summary,
"description": self.description,
"version": self.version,
"icon_url": self.icon_url,
"icon_path_128": self.icon_path_128,
"icon_path_64": self.icon_path_64,
"icon_filename": self.icon_filename,
"urls": self.urls,
"developer": self.developer,
#"architectures": self.architectures,
"categories": self.categories,
"bundle_id": self.flatpak_bundle,
"match_type": self.match.name,
"repo": self.repo_name
}
class AppstreamSearcher:
"""Flatpak AppStream Package seacher"""
def __init__(self, refresh=False) -> None:
self.remotes: dict[str, list[AppStreamPackage]] = {}
self.refresh_progress = 0
self.refresh = refresh
# Define category groups and their titles
self.category_groups = {
'system': {
'installed': 'Installed',
'updates': 'Updates',
'repositories': 'Repositories'
},
'collections': {
'trending': 'Trending',
'popular': 'Popular',
'recently-added': 'New',
'recently-updated': 'Updated'
},
'categories': {
'office': 'Productivity',
'graphics': 'Graphics & Photography',
'audiovideo': 'Audio & Video',
'education': 'Education',
'network': 'Networking',
'game': 'Games',
'development': 'Developer Tools',
'science': 'Science',
'system': 'System',
'utility': 'Utilities'
}
}
self.subcategory_groups = {
'audiovideo': {
'audiovideoediting': 'Audio & Video Editing',
'discburning': 'Disc Burning',
'midi': 'Midi',
'mixer': 'Mixer',
'player': 'Player',
'recorder': 'Recorder',
'sequencer': 'Sequencer',
'tuner': 'Tuner',
'tv': 'TV'
},
'development': {
'building': 'Building',
'database': 'Database',
'debugger': 'Debugger',
'guidesigner': 'GUI Designer',
'ide': 'IDE',
'profiling': 'Profiling',
'revisioncontrol': 'Revision Control',
'translation': 'Translation',
'webdevelopment': 'Web Development'
},
'game': {
'actiongame': 'Action Games',
'adventuregame': 'Adventure Games',
'arcadegame': 'Arcade Games',
'blocksgame': 'Blocks Games',
'boardgame': 'Board Games',
'cardgame': 'Card Games',
'emulator': 'Emulators',
'kidsgame': 'Kids\' Games',
'logicgame': 'Logic Games',
'roleplaying': 'Role Playing',
'shooter': 'Shooter',
'simulation': 'Simulation',
'sportsgame': 'Sports Games',
'strategygame': 'Strategy Games'
},
'graphics': {
'2dgraphics': '2D Graphics',
'3dgraphics': '3D Graphics',
'ocr': 'OCR',
'photography': 'Photography',
'publishing': 'Publishing',
'rastergraphics': 'Raster Graphics',
'scanning': 'Scanning',
'vectorgraphics': 'Vector Graphics',
'viewer': 'Viewer'
},
'network': {
'chat': 'Chat',
'email': 'Email',
'feed': 'Feed',
'filetransfer': 'File Transfer',
'hamradio': 'Ham Radio',
'instantmessaging': 'Instant Messaging',
'ircclient': 'IRC Client',
'monitor': 'Monitor',
'news': 'News',
'p2p': 'P2P',
'remoteaccess': 'Remote Access',
'telephony': 'Telephony',
'videoconference': 'Video Conference',
'webbrowser': 'Web Browser',
'webdevelopment': 'Web Development'
},
'office': {
'calendar': 'Calendar',
'chart': 'Chart',
'contactmanagement': 'Contact Management',
'database': 'Database',
'dictionary': 'Dictionary',
'email': 'Email',
'finance': 'Finance',
'presentation': 'Presentation',
'projectmanagement': 'Project Management',
'publishing': 'Publishing',
'spreadsheet': 'Spreadsheet',
'viewer': 'Viewer',
'wordprocessor': 'Word Processor'
},
'system': {
'emulator': 'Emulators',
'filemanager': 'File Manager',
'filesystem': 'Filesystem',
'filetools': 'File Tools',
'monitor': 'Monitor',
'security': 'Security',
'terminalemulator': 'Terminal Emulator'
},
'utility': {
'accessibility': 'Accessibility',
'archiving': 'Archiving',
'calculator': 'Calculator',
'clock': 'Clock',
'compression': 'Compression',
'filetools': 'File Tools',
'telephonytools': 'Telephony Tools',
'texteditor': 'Text Editor',
'texttools': 'Text Tools'
}
}
def add_installation(self, inst: Flatpak.Installation):
"""Add enabled flatpak repositories from Flatpak.Installation"""
remotes = inst.list_remotes()
for remote in remotes:
if not remote.get_disabled():
self.add_remote(remote, inst)
def add_remote(self, remote: Flatpak.Remote, inst: Flatpak.Installation):
"""Add packages for a given Flatpak.Remote"""
remote_name = remote.get_name()
if remote_name not in self.remotes:
self.remotes[remote_name] = self._load_appstream_metadata(remote, inst)
def _load_appstream_metadata(self, remote: Flatpak.Remote, inst: Flatpak.Installation) -> list[AppStreamPackage]:
"""load AppStrean metadata and create AppStreamPackage objects"""
packages = []
metadata = AppStream.Metadata.new()
metadata.set_format_style(AppStream.FormatStyle.CATALOG)
if self.refresh:
if remote.get_name() == "flathub" or remote.get_name() == "flathub-beta":
remote.set_gpg_verify(True)
inst.modify_remote(remote, None)
inst.update_appstream_full_sync(remote.get_name(), None, None, True)
appstream_file = Path(remote.get_appstream_dir().get_path() + "/appstream.xml.gz")
if not appstream_file.exists() and check_internet():
try:
if remote.get_name() == "flathub" or remote.get_name() == "flathub-beta":
remote.set_gpg_verify(True)
inst.modify_remote(remote, None)
inst.update_appstream_full_sync(remote.get_name(), None, None, True)
except GLib.Error as e:
logger.error(f"Failed to update AppStream metadata: {str(e)}")
if appstream_file.exists():
metadata.parse_file(Gio.File.new_for_path(appstream_file.as_posix()), AppStream.FormatKind.XML)
components: AppStream.ComponentBox = metadata.get_components()
i = 0
for i in range(components.get_size()):
component = components.index_safe(i)
#if component.get_kind() == AppStream.ComponentKind.DESKTOP_APP:
packages.append(AppStreamPackage(component, remote))
return packages
else:
logger.debug(f"AppStream file not found: {appstream_file}")
return []
def search_flatpak_repo(self, keyword: str, repo_name: str) -> list[AppStreamPackage]:
search_results = []
packages = self.remotes[repo_name]
found = None
for package in packages:
# Try matching exact ID first
if keyword is package.id:
found = package
break
# Next try matching exact name
elif keyword.lower() is package.name.lower():
found = package
break
# Try matching case insensitive ID next
elif keyword.lower() is package.id.lower():
found = package
break
# General keyword search
elif keyword.lower() in str(package).lower():
found = package
break
if found:
search_results.append(found)
return search_results
def search_flatpak(self, keyword: str, repo_name=None) -> list[AppStreamPackage]:
"""Search packages matching a keyword"""
search_results = []
keyword = keyword
if not repo_name:
for remote_name in self.remotes.keys():
search_results.extend(self.search_flatpak_repo(keyword, remote_name))
else:
if repo_name in self.remotes.keys():
search_results.extend(self.search_flatpak_repo(keyword, repo_name))
return search_results
def get_all_apps(self, repo_name=None) -> list[AppStreamPackage]:
"""Get all available apps from specified or all repositories"""
all_packages = []
if repo_name:
if repo_name in self.remotes:
all_packages = self.remotes[repo_name]
else:
for remote_name in self.remotes.keys():
all_packages.extend(self.remotes[remote_name])
return all_packages
def get_categories_summary(self, repo_name=None) -> dict:
"""Get a summary of all apps grouped by category"""
apps = self.get_all_apps(repo_name)
categories = {}
for app in apps:
for category in app.categories:
# Normalize category names to match our groups
normalized_category = category.lower()
# Map category to its group title
for group_name, categories_dict in self.category_groups.items():
if normalized_category in categories_dict:
display_category = categories_dict[normalized_category]
break
else:
display_category = normalized_category.title()
if display_category not in categories:
categories[display_category] = []
categories[display_category].append(app)
return categories
def get_subcategories_summary(self, repo_name=None) -> list[tuple[str, str, list[AppStreamPackage]]]:
"""Get a summary of all apps grouped by category and subcategory."""
apps = self.get_all_apps(repo_name)
subcategories = []
# Process each category and its subcategories
for category, subcategories_dict in self.subcategory_groups.items():
for subcategory, title in subcategories_dict.items():
apps_in_subcategory = []
for app in apps:
if category in app.categories and subcategory in app.categories:
apps_in_subcategory.append(app)
if apps_in_subcategory:
subcategories.append((category, subcategory, apps_in_subcategory))
return subcategories
def get_installed_apps(self, system=False) -> list[tuple[str, str, str]]:
"""Get a list of all installed Flatpak applications with their repository source"""
installed_refs = []
installation = get_installation(system)
def process_installed_refs(inst: Flatpak.Installation, system=False):
for ref in inst.list_installed_refs():
app_id = ref.get_name()
remote_name = ref.get_origin()
if system is False:
installed_refs.append((app_id, remote_name, "user"))
else:
installed_refs.append((app_id, remote_name, "system"))
# Process both system-wide and user installations
process_installed_refs(installation, system)
# Remove duplicates while maintaining order
seen = set()
unique_installed = [(ref, repo, repo_type) for ref, repo, repo_type in installed_refs
if not (ref in seen or seen.add(ref))]
return unique_installed
def check_updates(self, system=False) -> list[tuple[str, str, str]]:
"""Check for available updates for installed Flatpak applications"""
updates = []
installation = get_installation(system)
def check_updates_for_install(inst: Flatpak.Installation, system=False):
for ref in inst.list_installed_refs_for_update(None):
app_id = ref.get_name()
# Get remote name from the installation
remote_name = ref.get_origin()
if system is False:
updates.append((app_id, remote_name, "user"))
else:
updates.append((app_id, remote_name, "system"))
# Process both system-wide and user installations
check_updates_for_install(installation, system)
return updates
def fetch_flathub_category_apps(self, category):
"""Fetch applications from Flathub API for the specified category."""
try:
# URL encode the category to handle special characters
encoded_category = quote_plus(category)
# Determine the base URL based on category type
if category in self.category_groups['collections']:
url = f"https://flathub.org/api/v2/collection/{encoded_category}"
else:
url = f"https://flathub.org/api/v2/collection/category/{encoded_category}"
response = requests.get(url, timeout=10)
if response.status_code == 200:
data = response.json()
# If this is a collections category, save it to our collections database
if category in self.category_groups['collections']:
if not hasattr(self, 'collections_db'):
self.collections_db = []
self.collections_db.append({
'category': category,
'data': data
})
return data
else:
print(f"Failed to fetch apps: Status code {response.status_code}")
return None
except requests.RequestException as e:
print(f"Error fetching apps: {str(e)}")
return None
def save_collections_data(self, filename='collections_data.json'):
"""Save all collected collections data to a JSON file."""
if not hasattr(self, 'collections_db') or not self.collections_db:
return
try:
with open(filename, 'w', encoding='utf-8') as f:
json.dump(self.collections_db, f, indent=2, ensure_ascii=False)
except IOError as e:
print(f"Error saving collections data: {str(e)}")
def update_collection_results(self, new_collection_results):
"""Update search results by replacing existing items and adding new ones."""
# Create a set of existing app_ids for efficient lookup
existing_app_ids = {app.id for app in self.collection_results}
# Create a list to store the updated results
updated_results = []
# First add all existing results
updated_results.extend(self.collection_results)
# Add new results, replacing any existing ones
for new_result in new_collection_results:
app_id = new_result.id
if app_id in existing_app_ids:
# Replace existing result
for i, existing in enumerate(updated_results):
if existing.id == app_id:
updated_results[i] = new_result
break
else:
# Add new result
updated_results.append(new_result)
self.collection_results = updated_results
def fetch_flathub_subcategory_apps(self, category: str, subcategory: str) -> dict|None:
"""Fetch applications from Flathub API for the specified category and subcategory."""
try:
# URL encode the category and subcategory to handle special characters
encoded_category = quote_plus(category)
encoded_subcategory = quote_plus(subcategory)
# Construct the API URL for subcategories
url = f"https://flathub.org/api/v2/collection/category/{encoded_category}/subcategories?subcategory={encoded_subcategory}"
response = requests.get(url, timeout=10)
if response.status_code == 200:
data = response.json()
return data
else:
print(f"Failed to fetch apps: Status code {response.status_code}")
return None
except requests.RequestException as e:
print(f"Error fetching apps: {str(e)}")
return None
def save_subcategories_data(self, filename='subcategories_data.json'):
"""Save all collected subcategories data to a JSON file."""
if not hasattr(self, 'subcategories_results') or not self.subcategories_results:
return
try:
with open(filename, 'w', encoding='utf-8') as f:
json.dump(self.subcategories_results, f, indent=2, ensure_ascii=False)
except IOError as e:
print(f"Error saving subcategories data: {str(e)}")
def update_subcategories_data(self):
"""Fetch and store data for all subcategories."""
if not hasattr(self, 'subcategories_results'):
self.subcategories_results = []
# Process each category and its subcategories
for category, subcategories in self.subcategory_groups.items():
for subcategory, title in subcategories.items():
api_data = self.fetch_flathub_subcategory_apps(category, subcategory)
if api_data:
self.subcategories_results.append({
'category': category,
'subcategory': subcategory,
'data': api_data
})
# Save the collected data
self.save_subcategories_data()
def refresh_local(self, system=False):
# make sure to reset these to empty before refreshing.
self.installed_results = [] # Initialize empty list
self.updates_results = [] # Initialize empty list
total_categories = sum(len(categories) for categories in self.category_groups.values())
current_category = 0
# Search for each app in local repositories
searcher = get_reposearcher(system)
search_result = []
for group_name, categories in self.category_groups.items():
# Process categories one at a time to keep GUI responsive
for category, title in categories.items():
if "installed" in category:
installed_apps = searcher.get_installed_apps(system)
for app_id, repo_name, repo_type in installed_apps:
if repo_name:
search_result = searcher.search_flatpak(app_id, repo_name)
self.installed_results.extend(search_result)
elif "updates" in category:
updates = searcher.check_updates(system)
for repo_name, app_id, repo_type in updates:
if repo_name:
search_result = searcher.search_flatpak(app_id, repo_name)
self.updates_results.extend(search_result)
# Update progress bar
self.refresh_progress = (current_category / total_categories) * 100
# make sure to reset these to empty before refreshing.
return self.installed_results, self.updates_results
def retrieve_metadata(self, system=False):
"""Retrieve and refresh metadata for Flatpak repositories."""
self._initialize_metadata()
if not check_internet():
return self._handle_offline_mode()
searcher = get_reposearcher(system, True)
self.all_apps = searcher.get_all_apps()
return self._process_categories(searcher, system)
def _initialize_metadata(self):
"""Initialize empty lists for metadata storage."""
self.category_results = []
self.collection_results = []
self.subcategories_results = []
self.installed_results = []
self.updates_results = []
self.all_apps = []
def _handle_offline_mode(self):
"""Handle metadata retrieval when offline."""
json_path = "collections_data.json"
try:
with open(json_path, 'r', encoding='utf-8') as f:
collections_data = json.load(f)
return self._process_offline_data(collections_data)
except (IOError, json.JSONDecodeError) as e:
logger.error(f"Error loading offline data: {str(e)}")
return None, [], [], [], []
# Also load subcategories data
subcategories_path = "subcategories_data.json"
try:
with open(subcategories_path, 'r', encoding='utf-8') as f:
subcategories_data = json.load(f)
self.subcategories_results = subcategories_data
except (IOError, json.JSONDecodeError) as e:
logger.error(f"Error loading subcategories data: {str(e)}")
self.subcategories_results = []
def _process_offline_data(self, collections_data):
"""Process cached collections data when offline."""
for collection in collections_data:
category = collection['category']
if category in self.category_groups['collections']:
apps = [app['app_id'] for app in collection['data'].get('hits', [])]
for app_id in apps:
search_result = self.search_flatpak(app_id, 'flathub')
self.collection_results.extend(search_result)
return self._get_current_results()
def _process_categories(self, searcher, system=False):
"""Process categories and retrieve metadata."""
total_categories = sum(len(categories) for categories in self.category_groups.values())
current_category = 0
for group_name, categories in self.category_groups.items():
for category, title in categories.items():
if category not in self.category_groups['system']:
self._process_category(searcher, category, current_category, total_categories)
else:
self._process_system_category(searcher, category, system)
current_category += 1
if self._should_refresh():
self.update_subcategories_data()
return self._get_current_results()
def _process_category(self, searcher, category, current_category, total_categories):
"""Process a single category and retrieve its metadata."""
json_path = "collections_data.json"
try:
with open(json_path, 'r', encoding='utf-8') as f:
collections_data = json.load(f)
self._update_from_collections(collections_data, category)
except (IOError, json.JSONDecodeError) as e:
logger.error(f"Error loading collections data: {str(e)}")
if self._should_refresh():
self._refresh_category_data(searcher, category)
self.refresh_progress = (current_category / total_categories) * 100
def _update_from_collections(self, collections_data, category):
"""Update results from cached collections data."""
for collection in collections_data:
if collection['category'] == category:
apps = [app['app_id'] for app in collection['data'].get('hits', [])]
for app_id in apps:
search_result = self.search_flatpak(app_id, 'flathub')
self.collection_results.extend(search_result)
def _should_refresh(self):
"""Check if category data needs refresh."""
json_path = "collections_data.json"
try:
mod_time = os.path.getmtime(json_path)
return (time.time() - mod_time) > 168 * 3600
except OSError:
return True
def _refresh_category_data(self, searcher, category):
"""Refresh category data from Flathub API."""
try:
api_data = self.fetch_flathub_category_apps(category)
if api_data:
apps = api_data['hits']
for app in apps:
app_id = app['app_id']
search_result = searcher.search_flatpak(app_id, 'flathub')
if category in self.category_groups['collections']:
self.update_collection_results(search_result)
else:
self.category_results.extend(search_result)
except requests.RequestException as e:
logger.error(f"Error refreshing category {category}: {str(e)}")
def _process_system_category(self, searcher, category, system=False):
"""Process system-related categories."""
if "installed" in category:
installed_apps = searcher.get_installed_apps(system)
for app_id, repo_name, repo_type in installed_apps:
if repo_name:
search_result = searcher.search_flatpak(app_id, repo_name)
self.installed_results.extend(search_result)
elif "updates" in category:
updates = searcher.check_updates(system)
for app_id, repo_name, repo_type in updates:
if repo_name:
search_result = searcher.search_flatpak(app_id, repo_name)
self.updates_results.extend(search_result)
def _get_current_results(self):
"""Return current metadata results."""
return (
self.category_results,
self.collection_results,
self.subcategories_results,
self.installed_results,
self.updates_results,
self.all_apps
)
def install_flatpak(app: AppStreamPackage, repo_name=None, system=False) -> tuple[bool, str]:
"""
Install a Flatpak package.
Args:
app (AppStreamPackage): The package to install.
repo_name (str): Optional repository name to use for installation
system (Optional[bool]): Whether to operate on user or system installation
Returns:
tuple[bool, str]: (success, message)
"""
if not repo_name:
repo_name = "flathub"
installation = get_installation(system)
transaction = Flatpak.Transaction.new_for_installation(installation)
# Add the install operation
transaction.add_install(repo_name, app.flatpak_bundle, None)
# Run the transaction
try:
transaction.run()
except GLib.Error as e:
return False, f"Installation failed: {e}"
return True, f"Successfully installed {app.id}"
def install_flatpakref(ref_file, system=False):
"""Add a new repository using a .flatpakrepo file"""
# Get existing repositories
installation = get_installation(system)
if not ref_file.endswith('.flatpakref'):
return False, "Flatpak ref file path or URL must end with .flatpakref extension."
if not os.path.exists(ref_file):
return False, f"Flatpak ref file '{ref_file}' does not exist."
# Read the flatpakref file
try:
with open(ref_file, 'rb') as f:
repo_data = f.read()
except IOError as e:
return False, f"Failed to read flatpakref file: {str(e)}"
# Convert the data to GLib.Bytes
repo_bytes = GLib.Bytes.new(repo_data)
installation = get_installation(system)
transaction = Flatpak.Transaction.new_for_installation(installation)
# Add the install operation
transaction.add_install_flatpakref(repo_bytes)
# Run the transaction
try:
transaction.run()
except GLib.Error as e:
return False, f"Installation failed: {e}"
return True, f"Successfully installed {ref_file}"
def remove_flatpak(app: AppStreamPackage, repo_name=None, system=False) -> tuple[bool, str]:
"""
Remove a Flatpak package using transactions.
Args:
app (AppStreamPackage): The package to install.
system (Optional[bool]): Whether to operate on user or system installation
Returns:
Tuple[bool, str]: (success, message)
"""
if not repo_name:
repo_name = "flathub"
# Get the appropriate installation based on user parameter
installation = get_installation(system)
# Create a new transaction for removal
transaction = Flatpak.Transaction.new_for_installation(installation)
transaction.add_uninstall(app.flatpak_bundle)
# Run the transaction
try:
transaction.run()
except GLib.Error as e:
return False, f"Failed to remove {app.id}: {e}"
return True, f"Successfully removed {app.id}"
def update_flatpak(app: AppStreamPackage, repo_name=None, system=False) -> tuple[bool, str]:
"""
Remove a Flatpak package using transactions.
Args:
app (AppStreamPackage): The package to install.
system (Optional[bool]): Whether to operate on user or system installation
Returns:
Tuple[bool, str]: (success, message)
"""
if not repo_name:
repo_name = "flathub"
# Get the appropriate installation based on user parameter
installation = get_installation(system)
# Create a new transaction for removal
transaction = Flatpak.Transaction.new_for_installation(installation)
transaction.add_update(app.flatpak_bundle)
# Run the transaction
try:
transaction.run()
except GLib.Error as e:
return False, f"Failed to update {app.id}: {e}"
return True, f"Successfully updated {app.id}"
def get_installation(system=False):
if system is False:
installation = Flatpak.Installation.new_user()
else:
installation = Flatpak.Installation.new_system()
return installation
def get_reposearcher(system=False, refresh=False):
installation = get_installation(system)
searcher = AppstreamSearcher(refresh)
searcher.add_installation(installation)
return searcher
def check_internet():
"""Check if internet connection is available."""
try:
requests.head('https://flathub.org', timeout=3)
return True
except requests.ConnectionError:
return False
def repotoggle(repo, toggle=True, system=False):
"""
Enable or disable a Flatpak repository
Args:
repo (str): Name of the repository to toggle
enable (toggle): True to enable, False to disable
Returns:
tuple: (success, error_message)
"""
if not repo:
return False, "Repository name cannot be empty"
installation = get_installation(system)
try:
remote = installation.get_remote_by_name(repo)
if not remote:
return False, f"Repository '{repo}' not found."
remote.set_disabled(not toggle)
# Modify the remote's disabled status
success = installation.modify_remote(
remote,
None
)
if success:
if toggle:
message = f"Successfully enabled {repo}."
else:
message = f"Successfully disabled {repo}."
return True, message
except GLib.GError as e:
return False, f"Failed to toggle repository: {str(e)}"
return False, "Operation failed"
def repolist(system=False):
installation = get_installation(system)
repos = installation.list_remotes()
return repos
def repodelete(repo, system=False):
installation = get_installation(system)
installation.remove_remote(repo)
def repoadd(repofile, system=False):
"""Add a new repository using a .flatpakrepo file"""
# Get existing repositories
installation = get_installation(system)
existing_repos = installation.list_remotes()
if not repofile.endswith('.flatpakrepo'):
return False, "Repository file path or URL must end with .flatpakrepo extension."
if repofile_is_url(repofile):
try:
local_path = download_repo(repofile)
repofile = local_path
print(f"\nRepository added successfully: {repofile}")
except:
return False, f"Repository file '{repofile}' could not be downloaded."
if not os.path.exists(repofile):
return False, f"Repository file '{repofile}' does not exist."
# Get repository title from file name
title = os.path.basename(repofile).replace('.flatpakrepo', '')
# Check for duplicate title (case insensitive)
existing_titles = [repo.get_name().casefold() for repo in existing_repos]
if title.casefold() in existing_titles:
return False, "A repository with this title already exists."
# Read the repository file
try:
with open(repofile, 'rb') as f:
repo_data = f.read()
except IOError as e:
return False, f"Failed to read repository file: {str(e)}"
# Convert the data to GLib.Bytes
repo_bytes = GLib.Bytes.new(repo_data)
# Create a new remote from the repository file
try:
remote = Flatpak.Remote.new_from_file(title, repo_bytes)
# Get URLs and normalize them by removing trailing slashes
new_url = remote.get_url().rstrip('/')
existing_urls = [repo.get_url().rstrip('/') for repo in existing_repos]
# Check if URL already exists
if new_url in existing_urls:
return False, f"A repository with URL '{new_url}' already exists."
user = "user"
if system:
user = "system"
remote.set_gpg_verify(True)
installation.add_remote(remote, True, None)
except GLib.GError as e:
return False, f"Failed to add repository: {str(e)}"
return True, f"{remote.get_name()} repository successfully added for {user} installation."
def repofile_is_url(string):
"""Check if a string is a valid URL"""
try:
result = urlparse(string)
return all([result.scheme, result.netloc])
except:
return False
def download_repo(url):
"""Download a repository file from URL to /tmp/"""
try:
# Create a deterministic filename based on the URL
url_path = urlparse(url).path
filename = os.path.basename(url_path) or 'repo'
tmp_path = Path(tempfile.gettempdir()) / f"{filename}"
# Download the file
with requests.get(url, stream=True) as response:
response.raise_for_status()
# Write the file in chunks, overwriting if it exists
with open(tmp_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
return str(tmp_path)
except requests.RequestException as e:
raise argparse.ArgumentTypeError(f"Failed to download repository file: {str(e)}")
except IOError as e:
raise argparse.ArgumentTypeError(f"Failed to save repository file: {str(e)}")
def get_metadata_path(app_id: str | None, override=False, system=False) -> str:
metadata_path = ""
if override:
if system:
metadata_path = "/var/lib/flatpak/overrides/global"
if not os.path.exists(metadata_path):
os.makedirs(os.path.dirname(metadata_path), exist_ok=True)
with open(metadata_path, 'w') as f:
pass
else:
home_dir = os.path.expanduser("~")
metadata_path = f"{home_dir}/.local/share/flatpak/overrides/global"
if not os.path.exists(metadata_path):
os.makedirs(os.path.dirname(metadata_path), exist_ok=True)
with open(metadata_path, 'w') as f:
pass
elif app_id:
# Get the application's metadata file
installation = get_installation(system)
app_path = installation.get_current_installed_app(app_id).get_deploy_dir()
if not app_path:
print(f"Application {app_id} not found")
return metadata_path
metadata_path = app_path + "/metadata"
if not os.path.exists(metadata_path):
print(f"Metadata file not found for {app_id}")
return metadata_path
return metadata_path
def get_perm_key_file(app_id: str | None, override=False, system=False) -> GLib.KeyFile:
metadata_path = get_metadata_path(app_id, override, system)
# Create a new KeyFile object
key_file = GLib.KeyFile()
# Read the existing metadata
try:
key_file.load_from_file(metadata_path, GLib.KeyFileFlags.NONE)
except GLib.Error as e:
print(f"Failed to read metadata file: {str(e)}")
return None
return key_file
def add_file_permissions(app_id: str, path: str, perm_type=None, system=False) -> tuple[bool, str]:
"""
Add filesystem permissions to a Flatpak application.
Args:
app_id (str): The ID of the Flatpak application
path (str): The path to grant access to. Can be:
- "home" for home directory access
- "/path/to/directory" for custom directory access
perm_type (str): The type of permissions to remove (e.g. "filesystems", "persistent") default is "filesystems"
system (bool): Whether to modify system-wide or user installation
Returns:
tuple[bool, str]: (success, message)
"""
try:
key_file = get_perm_key_file(app_id, system)
perm_type = perm_type or "filesystems"
# Handle special case for home directory
if path.lower() == "host":
filesystem_path = "host"
elif path.lower() == "host-os":
filesystem_path = "host-os"
elif path.lower() == "host-etc":
filesystem_path = "host-etc"
elif path.lower() == "home":
filesystem_path = "home"
else:
# Ensure path do not ends with a trailing slash
filesystem_path = path.rstrip('/')
if not key_file.has_group("Context"):
key_file.set_string("Context", perm_type, "")
# Now get the keys
context_keys = key_file.get_keys("Context")
# Check if perm_type exists in the section
if perm_type not in str(context_keys):
# Create the key with an empty string
key_file.set_string("Context", perm_type, "")
# Get existing filesystem paths
existing_paths = key_file.get_string("Context", perm_type)
if existing_paths is None or existing_paths == "":
# If no filesystems entry exists, create it
key_file.set_string("Context", perm_type, filesystem_path)
else:
# Split existing paths and check if our path already exists
existing_paths_list = existing_paths.split(';')
# Normalize paths for comparison (remove trailing slashes, convert to absolute paths)
normalized_new_path = os.path.abspath(filesystem_path.rstrip('/'))
normalized_existing_paths = [os.path.abspath(p.rstrip('/')) for p in existing_paths_list]
# Only add if the path doesn't already exist
if normalized_new_path not in normalized_existing_paths:
key_file.set_string("Context", perm_type,
existing_paths + filesystem_path + ";")
# Write the modified metadata back
try:
key_file.save_to_file(get_metadata_path(app_id, False, system))
except GLib.Error as e:
return False, f"Failed to save metadata file: {str(e)}"
return True, f"Successfully granted access to {path} for {app_id}"
except GLib.Error as e:
return False, f"Failed to modify permissions: {str(e)}"
def remove_file_permissions(app_id: str, path: str, perm_type=None, system=False) -> tuple[bool, str]:
"""
Remove filesystem permissions from a Flatpak application.
Args:
app_id (str): The ID of the Flatpak application
path (str): The path to revoke access to. Can be:
- "home" for home directory access
- "/path/to/directory" for custom directory access
perm_type (str): The type of permissions to remove (e.g. "filesystems", "persistent") default is "filesystems"
system (bool): Whether to modify system-wide or user installation
Returns:
tuple[bool, str]: (success, message)
"""
try:
key_file = get_perm_key_file(app_id, system)
perm_type = perm_type or "filesystems"
# Handle special case for home directory
if path.lower() == "host":
filesystem_path = "host"
elif path.lower() == "host-os":
filesystem_path = "host-os"
elif path.lower() == "host-etc":
filesystem_path = "host-etc"
elif path.lower() == "home":
filesystem_path = "home"
else:
# Ensure path do not ends with a trailing slash
filesystem_path = path.rstrip('/')
# Get existing filesystem paths
existing_paths = key_file.get_string("Context", perm_type)
if existing_paths is None:
return True, f"No filesystem permissions to remove for {app_id}"
# Split existing paths and normalize them for comparison
existing_paths_list = existing_paths.split(';')
normalized_new_path = os.path.abspath(filesystem_path.rstrip('/'))
normalized_existing_paths = [os.path.abspath(p.rstrip('/')) for p in existing_paths_list]
# Only remove if the path exists
if normalized_new_path not in normalized_existing_paths:
return True, f"No permission found for {path} in {app_id}"
# Remove the path from the existing paths
filtered_paths_list = [p for p in existing_paths_list
if os.path.abspath(p.rstrip('/')) != normalized_new_path]
# Join remaining paths back together
new_permissions = ";".join(filtered_paths_list)
if new_permissions:
# Save changes
key_file.set_string("Context", perm_type, new_permissions)
else:
key_file.remove_key("Context", perm_type)
# Write the modified metadata back
try:
key_file.save_to_file(get_metadata_path(app_id, False, system))
except GLib.Error as e:
return False, f"Failed to save metadata file: {str(e)}"
return True, f"Successfully removed access to {path} for {app_id}"
except GLib.Error as e:
return False, f"Failed to modify permissions: {str(e)}"
def list_file_perms(app_id: str, system=False) -> tuple[bool, dict[str, list[str]]]|tuple[bool, dict[str, list[str]]]:
"""
List filesystem permissions for a Flatpak application.
Args:
app_id (str): The ID of the Flatpak application
system (bool): Whether to check system-wide or user installation
Returns:
tuple[bool, dict[str, list[str]]]: (success, permissions_dict)
permissions_dict contains:
- 'paths': list of filesystem paths
- 'special_paths': list of special paths (home, host, etc.)
"""
try:
key_file = get_perm_key_file(app_id, system)
# Initialize result dictionary
result = {
"paths": [],
"special_paths": []
}
# Get existing filesystem paths
existing_paths = key_file.get_string("Context", "filesystems")
if existing_paths:
# Split and clean the paths
paths_list = [p.strip() for p in existing_paths.split(';')]
# Separate special paths from regular ones
for path in paths_list:
if path in ["home", "host", "host-os", "host-etc"]:
result["special_paths"].append(path)
else:
result["paths"].append(path)
return True, result
except GLib.Error:
return False, {"paths": [], "special_paths": []}
def list_other_perm_toggles(app_id: str, perm_type: str, system=False) -> tuple[bool, dict[str, list[str]]]|tuple[bool, dict[str, list[str]]]:
"""
List other permission toggles within "Context" for a Flatpak application.
Args:
app_id (str): The ID of the Flatpak application
perm_type (str): The type of permissions to list (e.g. "shared", "sockets", "devices", "features", "persistent")
system (bool): Whether to check system-wide or user installation
Returns:
tuple[bool, dict[str, list[str]]]: (success, permissions_dict)
permissions_dict contains:
- 'paths': list of filesystem paths
"""
try:
key_file = get_perm_key_file(app_id, system)
# Initialize result dictionary
result = {
"paths": []
}
# Get existing filesystem paths
existing_paths = key_file.get_string("Context", perm_type)
if existing_paths:
# Split, clean, and filter out empty paths
paths_list = [p.strip() for p in existing_paths.split(';') if p.strip()]
# Add filtered paths to result
result["paths"] = paths_list
return True, result
except GLib.Error:
return False, {"paths": []}
# Get existing filesystem paths
existing_paths = key_file.get_string("Context", perm_type)
if existing_paths:
# Split, clean, and filter out empty paths
paths_list = [p.strip() for p in existing_paths.split(';') if p.strip()]
# Add filtered paths to result
result["paths"] = paths_list
def toggle_other_perms(app_id: str, perm_type: str, option: str, enable: bool, system=False) -> tuple[bool, str]:
"""
Toggle a specific permission option for a Flatpak application.
Args:
app_id (str): The ID of the Flatpak application
perm_type (str): The type of permissions (shared, sockets, devices, features)
option (str): The specific permission to toggle
enable (bool): Whether to enable or disable the permission
system (bool): Whether to check system-wide or user installation
Returns:
bool: True if successful, False if operation failed
"""
# Get the KeyFile object
key_file = get_perm_key_file(app_id, system)
if not key_file:
return False, f"Failed to get permissions for {app_id}"
try:
perms_list = []
# Get all keys in the Context section
# Check if Context section exists
if not key_file.has_group("Context"):
key_file.set_string("Context", perm_type, "")
# Now get the keys
context_keys = key_file.get_keys("Context")
# Check if perm_type exists in the section
if perm_type not in str(context_keys):
# Create the key with an empty string
key_file.set_string("Context", perm_type, "")
# Get the existing permissions
existing_perms = key_file.get_string("Context", perm_type)
if existing_perms:
# Split into individual permissions
perms_list = [perm.strip() for perm in existing_perms.split(';') if perm.strip()]
# Toggle permission
if enable:
if option not in perms_list:
perms_list.append(option)
else:
if option in perms_list:
perms_list.remove(option)
# Join back with semicolons
new_perms = ";".join(perms_list)
# Save changes
if new_perms:
key_file.set_string("Context", perm_type, new_perms)
else:
key_file.remove_key("Context", perm_type)
key_file.save_to_file(get_metadata_path(app_id, False, system))
return True, f"Successfully {'enabled' if enable else 'disabled'} {option} for {app_id}"
except GLib.Error:
return False, f"Failed to toggle {option} for {app_id}"
def list_other_perm_values(app_id: str, perm_type: str, system=False) -> tuple[bool, dict[str, list[str]]]:
"""
List all permission values for a specified type from a Flatpak application's configuration.
Args:
app_id (str): The ID of the Flatpak application
perm_type (str): The type of permissions to list (e.g. "environment", "session_bus", "system_bus")
system (bool): Whether to check system-wide or user installation
Returns:
tuple[bool, dict[str, list[str]]]: (success, env_vars_dict)
env_vars_dict contains:
- 'paths': list of environment variables
"""
try:
key_file = get_perm_key_file(app_id, system)
# Initialize result dictionary
result = {
"paths": []
}
match perm_type.lower():
case "environment":
perm_type = "Environment"
case "session_bus":
perm_type = "Session Bus Policy"
case "system_bus":
perm_type = "System Bus Policy"
case _:
return False, {"paths": []}
# Check if section exists using has_group()
if key_file.has_group(perm_type):
# Get all keys in the section
keys = key_file.get_keys(perm_type)
# Convert ResultTuple to list of individual keys
keys = list(keys[0]) if hasattr(keys, '__iter__') else []
# Get each value and add to paths list
for key in keys:
value = key_file.get_string(perm_type, key)
if value:
result["paths"].append(f"{key}={value}")
return True, result
except GLib.Error as e:
print(f"GLib.Error: {e}")
return False, {"paths": []}
except Exception as e:
print(f"Other error: {e}")
return False, {"paths": []}
def add_permission_value(app_id: str, perm_type: str, value: str, system=False) -> tuple[bool, str]:
"""
Add a permission value to a Flatpak application's configuration.
Args:
app_id (str): The ID of the Flatpak application
perm_type (str): The type of permissions (e.g. "environment", "session_bus", "system_bus")
value (str): The complete permission value to add (e.g. "XCURSOR_PATH=/run/host/user-share/icons:/run/host/share/icons")
system (bool): Whether to modify system-wide or user installation
Returns:
tuple[bool, str]: (success, message)
"""
try:
key_file = get_perm_key_file(app_id, system)
# Convert perm_type to the correct format
match perm_type.lower():
case "environment":
perm_type = "Environment"
case "session_bus":
perm_type = "Session Bus Policy"
case "system_bus":
perm_type = "System Bus Policy"
case _:
return False, "Invalid permission type"
# Split the value into key and actual value
parts = value.split('=', 1)
if len(parts) != 2:
return False, "Value must be in format 'key=value'"
key, val = parts
if val not in ['talk', 'own']:
return False, "Value must be in format 'key=value' with value as 'talk' or 'own'"
# Set the value
key_file.set_string(perm_type, key, val)
# Save the changes
key_file.save_to_file(get_metadata_path(app_id, False, system))
return True, f"Successfully added {value} to {perm_type} section"
except GLib.Error as e:
return False, f"Error adding permission: {str(e)}"
def remove_permission_value(app_id: str, perm_type: str, value: str, system=False) -> tuple[bool, str]:
"""
Remove a permission value from a Flatpak application's configuration.
Args:
app_id (str): The ID of the Flatpak application
perm_type (str): The type of permissions (e.g. "environment", "session_bus", "system_bus")
value (str): The complete permission value to remove (e.g. "XCURSOR_PATH=/run/host/user-share/icons:/run/host/share/icons")
system (bool): Whether to modify system-wide or user installation
Returns:
tuple[bool, str]: (success, message)
"""
try:
key_file = get_perm_key_file(app_id, system)
# Convert perm_type to the correct format
match perm_type.lower():
case "environment":
perm_type = "Environment"
case "session_bus":
perm_type = "Session Bus Policy"
case "system_bus":
perm_type = "System Bus Policy"
case _:
return False, "Invalid permission type"
# Split the value into key and actual value
parts = value.split('=', 1)
if len(parts) != 2:
return False, "Value must be in format 'key=value'"
key, val = parts
# Check if section exists
if not key_file.has_group(perm_type):
return False, f"Section {perm_type} does not exist"
# Remove the value
key_file.remove_key(perm_type, key)
# Save the changes
key_file.save_to_file(get_metadata_path(app_id, False, system))
return True, f"Successfully removed {value} from {perm_type} section"
except GLib.Error as e:
return False, f"Error removing permission: {str(e)}"
def global_add_file_permissions(path: str, perm_type=None, override=True, system=False) -> tuple[bool, str]:
"""
Add filesystem permissions to all Flatpak applications globally.
Args:
path (str): The path to grant access to. Can be:
- "home" for home directory access
- "/path/to/directory" for custom directory access
perm_type (str): The type of permissions to remove (e.g. "filesystems", "persistent") default is "filesystems"
override (bool): Whether to use global metadata file instead of per-app.
system (bool): Whether to modify system-wide or user installation
Returns:
tuple[bool, str]: (success, message)
"""
try:
key_file = get_perm_key_file(None, override, system)
perm_type = perm_type or "filesystems"
# Handle special case for home directory
if path.lower() == "host":
filesystem_path = "host"
elif path.lower() == "host-os":
filesystem_path = "host-os"
elif path.lower() == "host-etc":
filesystem_path = "host-etc"
elif path.lower() == "home":
filesystem_path = "home"
else:
# Ensure path do not ends with a trailing slash
filesystem_path = path.rstrip('/')
if not key_file.has_group("Context"):
key_file.set_string("Context", perm_type, "")
# Now get the keys
context_keys = key_file.get_keys("Context")
# Check if perm_type exists in the section
if perm_type not in str(context_keys):
# Create the key with an empty string
key_file.set_string("Context", perm_type, "")
# Get existing filesystem paths
existing_paths = key_file.get_string("Context", perm_type)
if existing_paths is None or existing_paths == "":
# If no filesystems entry exists, create it
key_file.set_string("Context", perm_type, filesystem_path)
else:
# Split existing paths and check if our path already exists
existing_paths_list = existing_paths.split(';')
# Normalize paths for comparison (remove trailing slashes, convert to absolute paths)
normalized_new_path = os.path.abspath(filesystem_path.rstrip('/'))
normalized_existing_paths = [os.path.abspath(p.rstrip('/')) for p in existing_paths_list]
# Only add if the path doesn't already exist
if normalized_new_path not in normalized_existing_paths:
key_file.set_string("Context", perm_type,
existing_paths + filesystem_path + ";")
# Write the modified metadata back
try:
key_file.save_to_file(get_metadata_path(None, override, system))
except GLib.Error as e:
return False, f"Failed to save metadata file: {str(e)}"
return True, f"Successfully granted access to {path} globally"
except GLib.Error as e:
return False, f"Failed to modify permissions: {str(e)}"
def global_remove_file_permissions(path: str, perm_type=None, override=True, system=False) -> tuple[bool, str]:
"""
Remove filesystem permissions from all Flatpak applications globally.
Args:
path (str): The path to revoke access to. Can be:
- "home" for home directory access
- "/path/to/directory" for custom directory access
perm_type (str): The type of permissions to remove (e.g. "filesystems", "persistent") default is "filesystems"
override (bool): Whether to use global metadata file instead of per-app.
system (bool): Whether to modify system-wide or user installation
Returns:
tuple[bool, str]: (success, message)
"""
try:
key_file = get_perm_key_file(None, override, system)
perm_type = perm_type or "filesystems"
# Handle special case for home directory
if path.lower() == "host":
filesystem_path = "host"
elif path.lower() == "host-os":
filesystem_path = "host-os"
elif path.lower() == "host-etc":
filesystem_path = "host-etc"
elif path.lower() == "home":
filesystem_path = "home"
else:
# Ensure path do not ends with a trailing slash
filesystem_path = path.rstrip('/')
# Get existing filesystem paths
existing_paths = key_file.get_string("Context", perm_type)
if existing_paths is None:
return True, "No filesystem permissions to remove globally"
# Split existing paths and normalize them for comparison
existing_paths_list = existing_paths.split(';')
normalized_new_path = os.path.abspath(filesystem_path.rstrip('/'))
normalized_existing_paths = [os.path.abspath(p.rstrip('/')) for p in existing_paths_list]
# Only remove if the path exists
if normalized_new_path not in normalized_existing_paths:
return True, f"No permission found for {path} globally"
# Remove the path from the existing paths
filtered_paths_list = [p for p in existing_paths_list
if os.path.abspath(p.rstrip('/')) != normalized_new_path]
# Join remaining paths back together
new_permissions = ";".join(filtered_paths_list)
if new_permissions:
# Save changes
key_file.set_string("Context", perm_type, new_permissions)
else:
key_file.remove_key("Context", perm_type)
# Write the modified metadata back
try:
key_file.save_to_file(get_metadata_path(None, override, system))
except GLib.Error as e:
return False, f"Failed to save metadata file: {str(e)}"
return True, f"Successfully removed access to {path} globally"
except GLib.Error as e:
return False, f"Failed to modify permissions: {str(e)}"
def global_list_file_perms(override=True, system=False) -> tuple[bool, dict[str, list[str]]]|tuple[bool, dict[str, list[str]]]:
"""
List filesystem permissions for all Flatpak applications globally.
Args:
override (bool): Whether to use global metadata file instead of per-app.
system (bool): Whether to check system-wide or user installation
Returns:
tuple[bool, dict[str, list[str]]]: (success, permissions_dict)
permissions_dict contains:
- 'paths': list of filesystem paths
- 'special_paths': list of special paths (home, host, etc.)
"""
try:
key_file = get_perm_key_file(None, override, system)
# Initialize result dictionary
result = {
"paths": [],
"special_paths": []
}
# Get existing filesystem paths
existing_paths = key_file.get_string("Context", "filesystems")
if existing_paths:
# Split and clean the paths
paths_list = [p.strip() for p in existing_paths.split(';')]
# Separate special paths from regular ones
for path in paths_list:
if path in ["home", "host", "host-os", "host-etc"]:
result["special_paths"].append(path)
else:
result["paths"].append(path)
return True, result
except GLib.Error:
return False, {"paths": [], "special_paths": []}
def global_list_other_perm_toggles(perm_type: str, override=True, system=False) -> tuple[bool, dict[str, list[str]]]|tuple[bool, dict[str, list[str]]]:
"""
List other permission toggles within "Context" for all Flatpak applications globally.
Args:
perm_type (str): The type of permissions to list (e.g. "shared", "sockets", "devices", "features", "persistent")
override (bool): Whether to use global metadata file instead of per-app.
system (bool): Whether to check system-wide or user installation
Returns:
tuple[bool, dict[str, list[str]]]: (success, permissions_dict)
permissions_dict contains:
- 'paths': list of filesystem paths
"""
try:
key_file = get_perm_key_file(None, override, system)
# Initialize result dictionary
result = {
"paths": []
}
# Get existing filesystem paths
existing_paths = key_file.get_string("Context", perm_type)
if existing_paths:
# Split, clean, and filter out empty paths
paths_list = [p.strip() for p in existing_paths.split(';') if p.strip()]
# Add filtered paths to result
result["paths"] = paths_list
return True, result
except GLib.Error:
return False, {"paths": []}
# Get existing filesystem paths
existing_paths = key_file.get_string("Context", perm_type)
if existing_paths:
# Split, clean, and filter out empty paths
paths_list = [p.strip() for p in existing_paths.split(';') if p.strip()]
# Add filtered paths to result
result["paths"] = paths_list
def global_toggle_other_perms(perm_type: str, option: str, enable: bool, override=True, system=False) -> tuple[bool, str]:
"""
Toggle a specific permission option for all Flatpak applications globally.
Args:
perm_type (str): The type of permissions (shared, sockets, devices, features)
option (str): The specific permission to toggle
enable (bool): Whether to enable or disable the permission
override (bool): Whether to use global metadata file instead of per-app.
system (bool): Whether to check system-wide or user installation
Returns:
bool: True if successful, False if operation failed
"""
# Get the KeyFile object
key_file = get_perm_key_file(None, override, system)
if not key_file:
return False, "Failed to get permissions globally"
try:
perms_list = []
# Get all keys in the Context section
# Check if Context section exists
if not key_file.has_group("Context"):
key_file.set_string("Context", perm_type, "")
# Now get the keys
context_keys = key_file.get_keys("Context")
# Check if perm_type exists in the section
if perm_type not in str(context_keys):
# Create the key with an empty string
key_file.set_string("Context", perm_type, "")
# Get the existing permissions
existing_perms = key_file.get_string("Context", perm_type)
if existing_perms:
# Split into individual permissions
perms_list = [perm.strip() for perm in existing_perms.split(';') if perm.strip()]
# Toggle permission
if enable:
if option not in perms_list:
perms_list.append(option)
else:
if option in perms_list:
perms_list.remove(option)
# Join back with semicolons
new_perms = ";".join(perms_list)
# Save changes
if new_perms:
key_file.set_string("Context", perm_type, new_perms)
else:
key_file.remove_key("Context", perm_type)
key_file.save_to_file(get_metadata_path(None, override, system))
return True, f"Successfully {'enabled' if enable else 'disabled'} {option} globally"
except GLib.Error:
return False, f"Failed to toggle {option} globally"
def global_list_other_perm_values(perm_type: str, override=True, system=False) -> tuple[bool, dict[str, list[str]]]:
"""
List all permission values for a specified type for all Flatpak applications globally.
Args:
perm_type (str): The type of permissions to list (e.g. "environment", "session_bus", "system_bus")
override (bool): Whether to use global metadata file instead of per-app.
system (bool): Whether to check system-wide or user installation
Returns:
tuple[bool, dict[str, list[str]]]: (success, env_vars_dict)
env_vars_dict contains:
- 'paths': list of environment variables
"""
try:
key_file = get_perm_key_file(None, override, system)
# Initialize result dictionary
result = {
"paths": []
}
match perm_type.lower():
case "environment":
perm_type = "Environment"
case "session_bus":
perm_type = "Session Bus Policy"
case "system_bus":
perm_type = "System Bus Policy"
case _:
return False, {"paths": []}
# Check if section exists using has_group()
if key_file.has_group(perm_type):
# Get all keys in the section
keys = key_file.get_keys(perm_type)
# Convert ResultTuple to list of individual keys
keys = list(keys[0]) if hasattr(keys, '__iter__') else []
# Get each value and add to paths list
for key in keys:
value = key_file.get_string(perm_type, key)
if value:
result["paths"].append(f"{key}={value}")
return True, result
except GLib.Error as e:
print(f"GLib.Error: {e}")
return False, {"paths": []}
except Exception as e:
print(f"Other error: {e}")
return False, {"paths": []}
def global_add_permission_value(perm_type: str, value: str, override=True, system=False) -> tuple[bool, str]:
"""
Add a permission value to all Flatpak applications globally.
Args:
perm_type (str): The type of permissions (e.g. "environment", "session_bus", "system_bus")
value (str): The complete permission value to add (e.g. "XCURSOR_PATH=/run/host/user-share/icons:/run/host/share/icons")
override (bool): Whether to use global metadata file instead of per-app.
system (bool): Whether to modify system-wide or user installation
Returns:
tuple[bool, str]: (success, message)
"""
try:
key_file = get_perm_key_file(None, override, system)
# Convert perm_type to the correct format
match perm_type.lower():
case "environment":
perm_type = "Environment"
case "session_bus":
perm_type = "Session Bus Policy"
case "system_bus":
perm_type = "System Bus Policy"
case _:
return False, "Invalid permission type"
# Split the value into key and actual value
parts = value.split('=', 1)
if len(parts) != 2:
return False, "Value must be in format 'key=value'"
key, val = parts
if val not in ['talk', 'own']:
return False, "Value must be in format 'key=value' with value as 'talk' or 'own'"
# Set the value
key_file.set_string(perm_type, key, val)
# Save the changes
key_file.save_to_file(get_metadata_path(None, override, system))
return True, f"Successfully added {value} to {perm_type} section"
except GLib.Error as e:
return False, f"Error adding permission: {str(e)}"
def global_remove_permission_value(perm_type: str, value: str, override=True, system=False) -> tuple[bool, str]:
"""
Remove a permission value from all Flatpak applications globally.
Args:
perm_type (str): The type of permissions (e.g. "environment", "session_bus", "system_bus")
value (str): The complete permission value to remove (e.g. "XCURSOR_PATH=/run/host/user-share/icons:/run/host/share/icons")
override (bool): Whether to use global metadata file instead of per-app.
system (bool): Whether to modify system-wide or user installation
Returns:
tuple[bool, str]: (success, message)
"""
try:
key_file = get_perm_key_file(None, override, system)
# Convert perm_type to the correct format
match perm_type.lower():
case "environment":
perm_type = "Environment"
case "session_bus":
perm_type = "Session Bus Policy"
case "system_bus":
perm_type = "System Bus Policy"
case _:
return False, "Invalid permission type"
# Split the value into key and actual value
parts = value.split('=', 1)
if len(parts) != 2:
return False, "Value must be in format 'key=value'"
key, val = parts
# Check if section exists
if not key_file.has_group(perm_type):
return False, f"Section {perm_type} does not exist"
# Remove the value
key_file.remove_key(perm_type, key)
# Save the changes
key_file.save_to_file(get_metadata_path(None, override, system))
return True, f"Successfully removed {value} from {perm_type} section"
except GLib.Error as e:
return False, f"Error removing permission: {str(e)}"
def portal_get_permission_store():
bus = dbus.SessionBus()
portal_service = bus.get_object("org.freedesktop.impl.portal.PermissionStore", "/org/freedesktop/impl/portal/PermissionStore")
permission_store = dbus.Interface(portal_service, "org.freedesktop.impl.portal.PermissionStore")
return permission_store
def portal_set_app_permissions(portal: str, app_id: str, status_str: str):
portal_id = ""
# This is done separately incase user types "notification" instead of "notifications"
if portal.lower() in "notifications":
portal = "notifications"
status = "no"
if status_str in ["yes", "true", "1", "enable"]:
status = "yes"
match portal.lower():
case "background":
portal_id = "background"
case "notifications":
portal_id = "notification"
case "microphone":
portal = "devices"
portal_id = "microphone"
case "speakers":
portal = "devices"
portal_id = "speakers"
case "camera":
portal = "devices"
portal_id = "camera"
case "location":
portal_id = "location"
try:
permission_store = portal_get_permission_store()
permission_store.SetPermission(
portal, # Category (string)
False, # Permission status (boolean: False means 'no')
portal_id, # Permission type (string)
app_id, # App ID (string)
[dbus.String(status)] # Array of permissions (string array)
)
return True, f"Permission set to {status} for {app_id} in {portal_id} portal"
except:
return False, f"Failed to set permission for {app_id} in {portal_id} portal"
def portal_get_app_permissions(app_id: str):
permissions = portal_lookup_all()
if not permissions:
return False, f"Permission not found for {app_id} in any portal"
# Store results for each portal where we find the app
app_permissions = {}
# Iterate through all portal entries
for portal_id, permission_data in permissions:
# Extract the DBus Dictionary containing app permissions
dbus_dict = permission_data[0] # First element contains the dictionary
# Check if our target app_id exists in the dictionary
if app_id in dbus_dict:
# Get the array of values for this app
value_array = dbus_dict[app_id]
# Return the first string value (typically 'yes' or 'no')
if len(value_array) > 0:
app_permissions[portal_id] = str(value_array[0])
# Format and return the results
if app_permissions:
return True, app_permissions
return False, f"No permissions found for {app_id} in any portal"
def portal_lookup(portal: str):
try:
portal_id = ""
# This is done separately incase user types "notification" instead of "notifications"
if portal.lower() in "notifications":
portal = "notifications"
match portal.lower():
case "background":
portal_id = "background"
case "notifications":
portal_id = "notification"
case "microphone":
portal = "devices"
portal_id = "microphone"
case "speakers":
portal = "devices"
portal_id = "speakers"
case "camera":
portal = "devices"
portal_id = "camera"
case "location":
portal_id = "location"
permission_store = portal_get_permission_store()
permissions = permission_store.Lookup(
portal, # Category (string)
portal_id # Permission type (string)
)
if permissions:
return permissions
except dbus.exceptions.DBusException:
# We don't care if a lookup fails, that just means no options were set for the portal
return []
def portal_lookup_all():
portal_permissions = []
# This is done separately incase user types "notification" instead of "notifications"
portal_names = ["background", "notifications", "microphone", "speakers", "camera", "location"]
for portal in portal_names:
try:
permissions = portal_lookup(
portal # Category (string)
)
if permissions:
portal_permissions.append((portal, permissions))
except dbus.exceptions.DBusException:
# We don't care if a lookup fails, that just means no options were set for the portal
return []
return portal_permissions
def main():
parser = argparse.ArgumentParser(description='Search Flatpak packages')
parser.add_argument('--id', help='Application ID to search for')
parser.add_argument('--repo', help='Filter results to specific repository')
parser.add_argument('--list-all', action='store_true', help='List all available apps')
parser.add_argument('--categories', action='store_true', help='Show apps grouped by category')
parser.add_argument('--subcategories', action='store_true',
help='Show apps grouped by subcategory')
parser.add_argument('--list-installed', action='store_true',
help='List all installed Flatpak applications')
parser.add_argument('--check-updates', action='store_true',
help='Check for available updates')
parser.add_argument('--list-repos', action='store_true',
help='List all configured Flatpak repositories')
parser.add_argument('--add-repo', type=str, metavar='REPO_FILE',
help='Add a new repository from a .flatpakrepo file')
parser.add_argument('--remove-repo', type=str, metavar='REPO_NAME',
help='Remove a Flatpak repository')
parser.add_argument('--toggle-repo', type=str,
metavar=('ENABLE/DISABLE'),
help='Enable or disable a repository')
parser.add_argument('--install', type=str, metavar='APP_ID',
help='Install a Flatpak package')
parser.add_argument('--remove', type=str, metavar='APP_ID',
help='Remove a Flatpak package')
parser.add_argument('--update', type=str, metavar='APP_ID',
help='Update a Flatpak package')
parser.add_argument('--system', action='store_true', help='Install as system instead of user')
parser.add_argument('--refresh', action='store_true', help='Install as system instead of user')
parser.add_argument('--refresh-local', action='store_true', help='Install as system instead of user')
parser.add_argument('--add-file-perms', type=str, metavar='PATH',
help='Add file permissions to an app (e.g. any defaults: host, host-os, host-etc, home, or "/path/to/directory" for custom paths)')
parser.add_argument('--remove-file-perms', type=str, metavar='PATH',
help='Remove file permissions from an app (e.g. any defaults: host, host-os, host-etc, home, or "/path/to/directory" for custom paths)')
parser.add_argument('--list-file-perms', action='store_true',
help='List configured file permissions for an app')
parser.add_argument('--list-other-perm-toggles', type=str, metavar='PERM_NAME',
help='List configured other permission toggles for an app (e.g. "shared", "sockets", "devices", "features")')
parser.add_argument('--toggle-other-perms', type=str, metavar=('ENABLE/DISABLE'),
help='Toggle other permissions on/off (True/False)')
parser.add_argument('--perm-type', type=str,
help='Type of permission to toggle (shared, sockets, devices, features, persistent)')
parser.add_argument('--perm-option', type=str,
help='Specific permission option to toggle (e.g. network, ipc)')
parser.add_argument('--list-other-perm-values', type=str, metavar='PERM_NAME',
help='List configured other permission group values for an app (e.g. "environment", "session_bus", "system_bus")')
parser.add_argument('--add-other-perm-values', type=str, metavar='TYPE',
help='Add a permission value (e.g. "environment", "session_bus", "system_bus")')
parser.add_argument('--remove-other-perm-values', type=str, metavar='TYPE',
help='Remove a permission value (e.g. "environment", "session_bus", "system_bus")')
parser.add_argument('--perm-value', type=str, metavar='VALUE',
help='The complete permission value to add or remove (e.g. "XCURSOR_PATH=/run/host/user-share/icons:/run/host/share/icons")')
parser.add_argument('--override', action='store_true', help='Set global permission override instead of per-application')
parser.add_argument('--global-add-file-perms', type=str, metavar='PATH',
help='Add file permissions to an app (e.g. any defaults: host, host-os, host-etc, home, or "/path/to/directory" for custom paths)')
parser.add_argument('--global-remove-file-perms', type=str, metavar='PATH',
help='Remove file permissions from an app (e.g. any defaults: host, host-os, host-etc, home, or "/path/to/directory" for custom paths)')
parser.add_argument('--global-list-file-perms', action='store_true',
help='List configured file permissions for an app')
parser.add_argument('--global-list-other-perm-toggles', type=str, metavar='PERM_NAME',
help='List configured other permission toggles for an app (e.g. "shared", "sockets", "devices", "features")')
parser.add_argument('--global-toggle-other-perms', type=str, metavar=('ENABLE/DISABLE'),
help='Toggle other permissions on/off (True/False)')
parser.add_argument('--global-list-other-perm-values', type=str, metavar='PERM_NAME',
help='List configured other permission group values for an app (e.g. "environment", "session_bus", "system_bus")')
parser.add_argument('--global-add-other-perm-values', type=str, metavar='TYPE',
help='Add a permission value (e.g. "environment", "session_bus", "system_bus")')
parser.add_argument('--global-remove-other-perm-values', type=str, metavar='TYPE',
help='Remove a permission value (e.g. "environment", "session_bus", "system_bus")')
parser.add_argument('--get-app-portal-permissions', action='store_true',
help='Check specified portal permissions (e.g. "background", "notifications", "microphone", "speakers", "camera", "location") for a specified application ID.')
parser.add_argument('--get-portal-permissions', type=str, metavar='TYPE',
help='List all current portal permissions for all applications')
parser.add_argument('--get-all-portal-permissions', action='store_true',
help='List all current portal permissions for all applications')
parser.add_argument('--set-app-portal-permissions', type=str, metavar='TYPE',
help='Set specified portal permissions (e.g. "background", "notifications", "microphone", "speakers", "camera", "location") yes/no for a specified application ID.')
parser.add_argument('--portal-perm-value', type=str, metavar='TYPE',
help='Set specified portal permissions value (yes/no) for a specified application ID.')
args = parser.parse_args()
# Handle repository operations
if args.toggle_repo:
handle_repo_toggle(args)
return
if args.list_repos:
handle_list_repos(args)
return
if args.add_repo:
handle_add_repo(args)
return
if args.remove_repo:
handle_remove_repo(args)
return
# Handle package operations
searcher = get_reposearcher(args.system)
if args.install:
handle_install(args, searcher)
return
if args.remove:
handle_remove(args, searcher)
return
if args.update:
handle_update(args, searcher)
return
# Handle information operations
if args.list_installed:
handle_list_installed(args, searcher)
return
if args.check_updates:
handle_check_updates(args, searcher)
return
if args.list_all:
handle_list_all(args, searcher)
return
if args.categories:
handle_categories(args, searcher)
return
if args.subcategories:
handle_subcategories(args, searcher)
return
if args.id:
if args.add_file_perms:
handle_add_file_perms(args, searcher)
return
if args.remove_file_perms:
handle_remove_file_perms(args, searcher)
return
if args.list_file_perms:
handle_list_file_perms(args, searcher)
return
if args.list_other_perm_toggles:
handle_list_other_perm_toggles(args, searcher)
return
if args.list_other_perm_values:
handle_list_other_perm_values(args, searcher)
return
if args.toggle_other_perms:
handle_toggle_other_perms(args, searcher)
return
if args.add_other_perm_values:
handle_add_other_perm_values(args, searcher)
return
if args.remove_other_perm_values:
handle_remove_other_perm_values(args, searcher)
return
if args.get_app_portal_permissions:
handle_get_app_portal_permissions(args, searcher)
return
if args.set_app_portal_permissions:
handle_set_app_portal_permissions(args, searcher)
return
else:
handle_search(args, searcher)
return
if args.override:
if args.global_add_file_perms:
handle_global_add_file_perms(args, searcher)
return
if args.global_remove_file_perms:
handle_global_remove_file_perms(args, searcher)
return
if args.global_list_file_perms:
handle_global_list_file_perms(args, searcher)
return
if args.global_list_other_perm_toggles:
handle_global_list_other_perm_toggles(args, searcher)
return
if args.global_list_other_perm_values:
handle_global_list_other_perm_values(args, searcher)
return
if args.global_toggle_other_perms:
handle_global_toggle_other_perms(args, searcher)
return
if args.global_add_other_perm_values:
handle_global_add_other_perm_values(args, searcher)
return
if args.global_remove_other_perm_values:
handle_global_remove_other_perm_values(args, searcher)
return
else:
print("Missing options. Use -h for help.")
if args.get_all_portal_permissions:
result = portal_lookup_all()
if result:
print("\nPortal Permissions:")
print("-" * 50)
for portal_id, permissions in result:
print(f"{portal_id}: {permissions}")
else:
print("No app permissions found set for any portals")
return
if args.get_portal_permissions:
result = portal_lookup(args.get_portal_permissions)
if result:
print("\nPortal Permissions:")
print("-" * 50)
for permissions in result:
print(f"{args.get_portal_permissions}: {permissions}")
else:
print(f"No app permissions found for {args.get_portal_permissions} portal")
return
print("Missing options. Use -h for help.")
def handle_repo_toggle(args):
repo_name = args.repo
if not repo_name:
print("Error: must specify a repo.")
sys.exit(1)
get_status = args.toggle_repo.lower() in ['true', 'enable']
try:
success, message = repotoggle(repo_name, get_status, args.system)
print(f"{message}")
except GLib.Error as e:
print(f"{str(e)}")
def handle_list_repos(args):
repos = repolist(args.system)
print("\nConfigured Repositories:")
for repo in repos:
print(f"- {repo.get_name()} ({repo.get_url()})")
def handle_add_repo(args):
try:
success, message = repoadd(args.add_repo, args.system)
print(f"{message}")
except GLib.Error as e:
print(f"{str(e)}")
def handle_remove_repo(args):
repodelete(args.remove_repo, args.system)
print(f"\nRepository removed successfully: {args.remove_repo}")
def handle_install(args, searcher):
if args.install.endswith('.flatpakref'):
try:
success, message = install_flatpakref(args.install, args.system)
result_message = f"{message}"
except GLib.Error as e:
result_message = f"Installation of {args.install} failed: {str(e)}"
print(result_message)
else:
packagelist = searcher.search_flatpak(args.install, args.repo)
result_message = ""
for package in packagelist:
try:
success, message = install_flatpak(package, args.repo, args.system)
result_message = f"{message}"
break
except GLib.Error as e:
result_message = f"Installation of {args.install} failed: {str(e)}"
pass
print(result_message)
def handle_remove(args, searcher):
packagelist = searcher.search_flatpak(args.remove, args.repo)
result_message = ""
for package in packagelist:
try:
success, message = remove_flatpak(package, args.repo, args.system)
result_message = f"{message}"
break
except GLib.Error as e:
result_message = f"Removal of {args.remove} failed: {str(e)}"
pass
print(result_message)
def handle_update(args, searcher):
packagelist = searcher.search_flatpak(args.update, args.repo)
result_message = ""
for package in packagelist:
try:
success, message = update_flatpak(package, args.repo, args.system)
result_message = f"{message}"
break
except GLib.Error as e:
result_message = f"Update of {args.update} failed: {str(e)}"
pass
print(result_message)
def handle_list_installed(args, searcher):
installed_apps = searcher.get_installed_apps(args.system)
print(f"\nInstalled Flatpak Applications ({len(installed_apps)}):")
for app_id, repo_name, repo_type in installed_apps:
print(f"{app_id} (Repository: {repo_name}, Installation: {repo_type})")
def handle_check_updates(args, searcher):
updates = searcher.check_updates(args.system)
print(f"\nAvailable Updates ({len(updates)}):")
for repo_name, app_id, repo_type in updates:
print(f"{app_id} (Repository: {repo_name}, Installation: {repo_type})")
def handle_list_all(args, searcher):
apps = searcher.get_all_apps(args.repo)
for app in apps:
details = app.get_details()
print(f"Name: {details['name']}")
print(f"Categories: {', '.join(details['categories'])}")
print("-" * 50)
def handle_categories(args, searcher):
categories = searcher.get_categories_summary(args.repo)
for category, apps in categories.items():
print(f"\n{category.upper()}:")
for app in apps:
print(f" - {app.name} ({app.id})")
def handle_subcategories(args, searcher):
"""Handle showing apps grouped by subcategory."""
subcategories = searcher.get_subcategories_summary(args.repo)
for category, subcategory, apps in subcategories:
print(f"\n{category.upper()} > {subcategory.upper()}:")
for app in apps:
print(f" - {app.name} ({app.id})")
def handle_add_file_perms(args, searcher):
try:
success, message = add_file_permissions(args.id, args.add_file_perms, args.perm_type, args.system)
print(f"{message}")
except GLib.Error as e:
print(f"{str(e)}")
def handle_remove_file_perms(args, searcher):
try:
success, message = remove_file_permissions(args.id, args.remove_file_perms, args.perm_type, args.system)
print(f"{message}")
except GLib.Error as e:
print(f"{str(e)}")
def handle_list_file_perms(args, searcher):
try:
success, message = list_file_perms(args.id, args.system)
print(f"{message}")
except GLib.Error as e:
print(f"{str(e)}")
def handle_list_other_perm_toggles(args, searcher):
try:
success, message = list_other_perm_toggles(args.id, args.list_other_perm_toggles, args.system)
print(f"{message}")
except GLib.Error as e:
print(f"{str(e)}")
def handle_toggle_other_perms(args, searcher):
if not args.perm_type:
print("Error: must specify --perm-type")
return
if not args.perm_option:
print("Error: must specify --perm-option")
return
get_status = args.toggle_other_perms.lower() in ['true', 'enable']
try:
success, message = toggle_other_perms(args.id, args.perm_type, args.perm_option, get_status, args.system)
print(f"{message}")
except GLib.Error as e:
print(f"{str(e)}")
def handle_list_other_perm_values(args, searcher):
try:
success, message = list_other_perm_values(args.id, args.list_other_perm_values, args.system)
print(f"{message}")
except GLib.Error as e:
print(f"{str(e)}")
def handle_add_other_perm_values(args, searcher):
if not args.id:
print("Error: must specify --id")
return
if not args.add_other_perm_values:
print("Error: must specify which perm value")
return
if not args.perm_value:
print("Error: must specify --perm-value")
return
try:
success, message = add_permission_value(args.id, args.add_other_perm_values, args.perm_value, args.system)
print(message)
except GLib.Error as e:
print(f"{str(e)}")
def handle_remove_other_perm_values(args, searcher):
if not args.id:
print("Error: must specify --id")
return
if not args.remove_other_perm_values:
print("Error: must specify which perm value")
return
if not args.perm_value:
print("Error: must specify --perm-value")
return
try:
success, message = remove_permission_value(args.id, args.remove_other_perm_values, args.perm_value, args.system)
print(message)
except GLib.Error as e:
print(f"{str(e)}")
def handle_global_add_file_perms(args, searcher):
try:
success, message = global_add_file_permissions(args.global_add_file_perms, True, args.system)
print(f"{message}")
except GLib.Error as e:
print(f"{str(e)}")
def handle_global_remove_file_perms(args, searcher):
try:
success, message = global_remove_file_permissions(args.global_remove_file_perms, True, args.system)
print(f"{message}")
except GLib.Error as e:
print(f"{str(e)}")
def handle_global_list_file_perms(args, searcher):
try:
success, message = global_list_file_perms(True, args.system)
print(f"{message}")
except GLib.Error as e:
print(f"{str(e)}")
def handle_global_list_other_perm_toggles(args, searcher):
try:
success, message = global_list_other_perm_toggles(args.global_list_other_perm_toggles, True, args.system)
print(f"{message}")
except GLib.Error as e:
print(f"{str(e)}")
def handle_global_toggle_other_perms(args, searcher):
if not args.perm_type:
print("Error: must specify --perm-type")
return
if not args.perm_option:
print("Error: must specify --perm-option")
return
get_status = args.global_toggle_other_perms.lower() in ['true', 'enable']
try:
success, message = global_toggle_other_perms(args.perm_type, args.perm_option, get_status, True, args.system)
print(f"{message}")
except GLib.Error as e:
print(f"{str(e)}")
def handle_global_list_other_perm_values(args, searcher):
try:
success, message = global_list_other_perm_values(args.global_list_other_perm_values, True, args.system)
print(f"{message}")
except GLib.Error as e:
print(f"{str(e)}")
def handle_global_add_other_perm_values(args, searcher):
if not args.global_add_other_perm_values:
print("Error: must specify which perm value")
return
if not args.perm_value:
print("Error: must specify --perm-value")
return
try:
success, message = global_add_permission_value(args.global_add_other_perm_values, args.perm_value, True, args.system)
print(message)
except GLib.Error as e:
print(f"{str(e)}")
def handle_global_remove_other_perm_values(args, searcher):
if not args.global_remove_other_perm_values:
print("Error: must specify which perm value")
return
if not args.perm_value:
print("Error: must specify --perm-value")
return
try:
success, message = global_remove_permission_value(args.global_remove_other_perm_values, args.perm_value, True, args.system)
print(message)
except GLib.Error as e:
print(f"{str(e)}")
def handle_set_app_portal_permissions(args, searcher):
if not args.id:
print("Error: must specify --id")
return
if not args.set_app_portal_permissions:
print("Error: must specify which portal")
return
if not args.portal_perm_value:
print("Error: must specify --portal-perm-value")
return
if args.portal_perm_value.lower() in ['true', 'enable', 'yes', '1']:
status_str = "yes"
else:
status_str = "no"
try:
success, message = portal_set_app_permissions(args.set_app_portal_permissions, args.id, status_str)
print(f"{message}")
except dbus.exceptions.DBusException as e:
print(f"{str(e)}")
def handle_get_app_portal_permissions(args, searcher):
if not args.id:
print("Error: must specify --id")
return
try:
success, message = portal_get_app_permissions(args.id)
print(f"{message}")
except dbus.exceptions.DBusException as e:
print(f"{str(e)}")
def handle_search(args, searcher):
if args.repo:
search_results = searcher.search_flatpak(args.id, args.repo)
else:
search_results = searcher.search_flatpak(args.id)
if search_results:
for package in search_results:
details = package.get_details()
print(f"Name: {details['name']}")
print(f"ID: {details['id']}")
print(f"Kind: {details['kind']}")
print(f"Summary: {details['summary']}")
print(f"Description: {details['description']}")
print(f"Version: {details['version']}")
print(f"Icon URL: {details['icon_url']}")
print(f"Icon PATH 128x128: {details['icon_path_128']}")
print(f"Icon PATH 64x64: {details['icon_path_64']}")
print(f"Icon FILE: {details['icon_filename']}")
print(f"Developer: {details['developer']}")
print(f"Categories: {details['categories']}")
urls = details['urls']
print(f"Donation URL: {urls['donation']}")
print(f"Homepage URL: {urls['homepage']}")
print(f"Bug Tracker URL: {urls['bugtracker']}")
print(f"Bundle ID: {details['bundle_id']}")
print(f"Match Type: {details['match_type']}")
print(f"Repo: {details['repo']}")
print("-" * 50)
if __name__ == "__main__":
main()