#!/usr/bin/python3 """check-snapshots greps the directory tree for rpmrepo urls and checks them against the current snapshot list""" import argparse import json import os import subprocess import sys import time from urllib.parse import urlparse import requests SNAPSHOTS_URL = "https://rpmrepo.osbuild.org/v2/enumerate" SNAPSHOTS_TIMEOUT = 2 * 60 SNAPSHOT_GREP = ["grep", "--color=never", "-or", r"http.*rpmrepo.osbuild.org.*-20[0-9]\+"] def fetch_snapshots_api(url, timeout=SNAPSHOTS_TIMEOUT): """Get the list of snapshots from the rpmrepo API""" print(f"Fetching list of snapshots from {url}") start = time.time() try: r = requests.get(url, timeout=timeout) except BaseException as e: # pylint: disable=broad-exception-caught print(f"Problem downloading {url}: {e}") return None elapsed = time.time() - start if r.status_code != 200: print(f"HTTP Response {r.status_code} from {url} after {elapsed:0.0f}s: {r.text}") return None print(f"Received snapshot list in {elapsed:0.0f}s") return r.json() def find_snapshot_urls(directory): """grep the directory for rpmrepo snapshot urls Returns a map of urls to the files they are used in. """ urls = {} try: grep_out = subprocess.run(SNAPSHOT_GREP + [directory], check=True, capture_output=True, env={"LANG": "C"}) except subprocess.CalledProcessError as e: print("ERROR: " + e.stderr.decode("utf-8")) sys.exit(1) for line in grep_out.stdout.decode("utf-8").splitlines(): try: file, url = line.split(":", 1) except ValueError: print(f"Problem parsing {line}") continue url = url.strip() if url not in urls: urls[url] = [file] else: urls[url].append(file) return urls def check_baseurl(repo, snapshots): """Check the baseurl to see if it is a valid snapshot, and if there is a newer one available. """ invalid = None newer = None url = urlparse(repo) snapshot = os.path.basename(url.path) # Is this snapshot valid? if snapshot not in snapshots: invalid = f"{snapshot} is not a valid snapshot name" # is this snapshot old? base = snapshot.rsplit("-", 1)[0] newest = snapshot for s in snapshots: if s.rsplit("-", 1)[0] != base: continue if s > newest: newest = s if newest != snapshot: newer = f"{snapshot} has a newer version - {newest}" return invalid, newer # pylint: disable=too-many-branches def check_snapshot_urls(urls, snapshots, skip=("test/data/assemblers", "test/data/manifests", "test/data/stages"), errors_only=False): """check the urls against the current list of snapshots Returns: 0 if all were valid and no newer snapshots are available 2 if there were invalid snapshots 3 if there were newer snapshots 6 if there were invalid and newer snapshots """ # Gather up the messages for each file messages = {} ret = 0 for url in urls: invalid, newer = check_baseurl(url, snapshots) if invalid: # Add this to each file's invalid message list for f in urls[url]: if any(bool(s in f) for s in skip): continue ret |= 2 if f in messages: if invalid not in messages[f]["invalid"]: messages[f]["invalid"].append(invalid) else: messages[f] = {"invalid": [invalid], "newer": []} if errors_only: continue if newer: # Add this to each file's newer message list for f in urls[url]: if any(bool(s in f) for s in skip): continue ret |= 4 if f in messages: if newer not in messages[f]["newer"]: messages[f]["newer"].append(newer) else: messages[f] = {"newer": [newer], "invalid": []} # Print the messages for each file for f, msgs in messages.items(): print(f"{f}:") for msg in msgs["invalid"]: print(f" ERROR: {msg}") for msg in msgs["newer"]: print(f" NEWER: {msg}") return ret # parse cmdline args def parse_args(): parser = argparse.ArgumentParser(description="Check snapshot urls") parser.add_argument("--verbose") parser.add_argument("--timeout", type=int, default=SNAPSHOTS_TIMEOUT, help="How long to wait for rpmrepo snapshot list") parser.add_argument("--cache", help="Use a cached file for the list of rpmrepo snapshots") parser.add_argument("--url", default=SNAPSHOTS_URL, help="URL to use for the list of rpmrepo snapshots") parser.add_argument("--errors-only", action="store_true", help="Only return errors") parser.add_argument("directory") return parser.parse_args() def main(): args = parse_args() urls = find_snapshot_urls(args.directory) snapshots = None if args.cache: try: with open(args.cache, encoding="utf8") as f: snapshots = json.load(f) except FileNotFoundError: print(f"No snapshots cache found at {args.cache}") sys.exit(1) else: snapshots = fetch_snapshots_api(args.url, args.timeout) if not snapshots: print(f"Cannot download snapshots from {args.url}") sys.exit(1) return check_snapshot_urls(urls, snapshots, errors_only=args.errors_only) if __name__ == '__main__': sys.exit(main())