yay, repository management is done

This commit is contained in:
GloriousEggroll 2025-03-23 21:47:18 -06:00
parent 8709766655
commit 57e6f363d4
2 changed files with 765 additions and 37 deletions

View file

@ -8,6 +8,11 @@ from pathlib import Path
import logging
from enum import IntEnum
import argparse
import urllib.parse
import requests
import tempfile
import os
import sys
# Set up logging
logging.basicConfig(level=logging.INFO)
@ -304,6 +309,140 @@ class AppstreamSearcher:
return updates
def reposearcher():
searcher = AppstreamSearcher()
searcher.add_installation(Flatpak.Installation.new_system())
return searcher
def repotoggle(repo, bool=True):
"""
Enable or disable a Flatpak repository
Args:
repo (str): Name of the repository to toggle
enable (bool): True to enable, False to disable
Returns:
tuple: (success, error_message)
"""
installation = Flatpak.Installation.new_system()
try:
remote = installation.get_remote_by_name(repo)
if not remote:
return False, f"Repository '{repo}' not found."
remote.set_disabled(not bool)
# Modify the remote's disabled status
success = installation.modify_remote(
remote,
None
)
if success:
if bool:
message = f"Successfully enabled {repo}."
else:
message = f"Successfully disabled {repo}."
return True, message
except GLib.GError as e:
return False, f"Failed to toggle repository: {str(e)}."
def repolist():
installation = Flatpak.Installation.new_system()
repos = installation.list_remotes()
return repos
def repodelete(repo):
installation = Flatpak.Installation.new_system()
installation.remove_remote(repo)
def repoadd(repofile):
"""Add a new repository using a .flatpakrepo file"""
# Get existing repositories
installation = Flatpak.Installation.new_system()
existing_repos = installation.list_remotes()
if not repofile.endswith('.flatpakrepo'):
return False, "Repository file path or URL must end with .flatpakrepo extension."
if repofile_is_url(repofile):
try:
local_path = download_repo(repofile)
repofile = local_path
except:
return False, f"Repository file '{repofile}' could not be downloaded."
if not os.path.exists(repofile):
return False, f"Repository file '{repofile}' does not exist."
# Get repository title from file name
title = os.path.basename(repofile).replace('.flatpakrepo', '')
# Check for duplicate title (case insensitive)
existing_titles = [repo.get_name().casefold() for repo in existing_repos]
if title.casefold() in existing_titles:
return False, "A repository with this title already exists."
# Read the repository file
try:
with open(repofile, 'rb') as f:
repo_data = f.read()
except IOError as e:
return False, f"Failed to read repository file: {str(e)}"
# Convert the data to GLib.Bytes
repo_bytes = GLib.Bytes.new(repo_data)
# Create a new remote from the repository file
try:
remote = Flatpak.Remote.new_from_file(title, repo_bytes)
# Get URLs and normalize them by removing trailing slashes
new_url = remote.get_url().rstrip('/')
existing_urls = [repo.get_url().rstrip('/') for repo in existing_repos]
# Check if URL already exists
if new_url in existing_urls:
return False, f"A repository with URL '{new_url}' already exists."
installation.add_remote(remote, True, None)
except GLib.GError as e:
return False, f"Failed to add repository: {str(e)}"
return True, None
def repofile_is_url(string):
"""Check if a string is a valid URL"""
try:
result = urllib.parse.urlparse(string)
return all([result.scheme, result.netloc])
except:
return False
def download_repo(url):
"""Download a repository file from URL to /tmp/"""
try:
# Create a deterministic filename based on the URL
url_path = urllib.parse.urlparse(url).path
filename = os.path.basename(url_path) or 'repo'
tmp_path = Path(tempfile.gettempdir()) / f"{filename}.flatpakrepo"
# Download the file
with requests.get(url, stream=True) as response:
response.raise_for_status()
# Write the file in chunks, overwriting if it exists
with open(tmp_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
return str(tmp_path)
except requests.RequestException as e:
raise argparse.ArgumentTypeError(f"Failed to download repository file: {str(e)}")
except IOError as e:
raise argparse.ArgumentTypeError(f"Failed to save repository file: {str(e)}")
def main():
"""Main function demonstrating Flatpak information retrieval"""
@ -316,6 +455,15 @@ def main():
help='List all installed Flatpak applications')
parser.add_argument('--check-updates', action='store_true',
help='Check for available updates')
parser.add_argument('--list-repos', action='store_true',
help='List all configured Flatpak repositories')
parser.add_argument('--add-repo', type=str, metavar='REPO_FILE',
help='Add a new repository from a .flatpakrepo file')
parser.add_argument('--remove-repo', type=str, metavar='REPO_NAME',
help='Remove a Flatpak repository')
parser.add_argument('--toggle-repo', type=str, nargs=2,
metavar=('REPO_NAME', 'ENABLE/DISABLE'),
help='Enable or disable a repository')
args = parser.parse_args()
app_id = args.id
@ -323,12 +471,41 @@ def main():
list_all = args.list_all
show_categories = args.categories
# Create AppstreamSearcher instance
searcher = AppstreamSearcher()
# Repository management operations
if args.toggle_repo:
repo_name, enable_str = args.toggle_repo
if enable_str.lower() not in ['true', 'false', 'enable', 'disable']:
print("Invalid enable/disable value. Use 'true/false' or 'enable/disable'")
sys.exit(1)
# Add installations
installation = Flatpak.Installation.new_system(None)
searcher.add_installation(installation)
enable = enable_str.lower() in ['true', 'enable']
success, message = repotoggle(repo_name, enable)
print(message)
sys.exit(0 if success else 1)
if args.list_repos:
repos = repolist()
print("\nConfigured Repositories:")
for repo in repos:
print(f"- {repo.get_name()} ({repo.get_url()})")
return
if args.add_repo:
success, error_message = repoadd(args.add_repo)
if error_message:
print(error_message)
sys.exit(1)
else:
print(f"\nRepository added successfully: {args.add_repo}")
return
if args.remove_repo:
repodelete(args.remove_repo)
print(f"\nRepository removed successfully: {args.remove_repo}")
return
# Create AppstreamSearcher instance
searcher = reposearcher()
if args.list_installed:
installed_apps = searcher.get_installed_apps()