The code in `check-snapshots` will print "No snapshots cache found at ..." regardless of the error that happens when trying to open the file. This can be misleading if e.g. the issue is permissions to open the file or the file is corrupted. So make the exception more targeted and only catch FileNotFound error and let python how the full error for the other cases. Obviously this can be done in many ways so I'm happy to tweak and e.g. keep catching all exception but print the value etc.
184 lines
5.7 KiB
Python
Executable file
184 lines
5.7 KiB
Python
Executable file
#!/usr/bin/python3
|
|
"""check-snapshots greps the directory tree for rpmrepo urls and checks them
|
|
against the current snapshot list"""
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
from urllib.parse import urlparse
|
|
|
|
import requests
|
|
|
|
SNAPSHOTS_URL = "https://rpmrepo.osbuild.org/v2/enumerate"
|
|
SNAPSHOTS_TIMEOUT = 2 * 60
|
|
SNAPSHOT_GREP = ["grep", "--color=never", "-or", r"http.*rpmrepo.osbuild.org.*-20[0-9]\+"]
|
|
|
|
|
|
def fetch_snapshots_api(url, timeout=SNAPSHOTS_TIMEOUT):
|
|
"""Get the list of snapshots from the rpmrepo API"""
|
|
print(f"Fetching list of snapshots from {url}")
|
|
start = time.time()
|
|
try:
|
|
r = requests.get(url, timeout=timeout)
|
|
except BaseException as e: # pylint: disable=broad-exception-caught
|
|
return None
|
|
elapsed = time.time() - start
|
|
if r.status_code != 200:
|
|
print(f"HTTP Response {r.status_code} from {url} after {elapsed:0.0f}s: {r.text}")
|
|
return None
|
|
print(f"Received snapshot list in {elapsed:0.0f}s")
|
|
return r.json()
|
|
|
|
|
|
def find_snapshot_urls(directory):
|
|
"""grep the directory for rpmrepo snapshot urls
|
|
|
|
Returns a map of urls to the files they are used in.
|
|
"""
|
|
urls = {}
|
|
try:
|
|
grep_out = subprocess.run(SNAPSHOT_GREP + [directory],
|
|
check=True,
|
|
capture_output=True,
|
|
env={"LANG": "C"})
|
|
except subprocess.CalledProcessError as e:
|
|
print("ERROR: " + e.stderr.decode("utf-8"))
|
|
sys.exit(1)
|
|
|
|
for line in grep_out.stdout.decode("utf-8").splitlines():
|
|
try:
|
|
file, url = line.split(":", 1)
|
|
except ValueError:
|
|
print(f"Problem parsing {line}")
|
|
continue
|
|
url = url.strip()
|
|
if url not in urls:
|
|
urls[url] = [file]
|
|
else:
|
|
urls[url].append(file)
|
|
|
|
return urls
|
|
|
|
|
|
def check_baseurl(repo, snapshots):
|
|
"""Check the baseurl to see if it is a valid snapshot, and if there is a newer one
|
|
available.
|
|
"""
|
|
invalid = None
|
|
newer = None
|
|
url = urlparse(repo)
|
|
snapshot = os.path.basename(url.path)
|
|
|
|
# Is this snapshot valid?
|
|
if snapshot not in snapshots:
|
|
invalid = f"{snapshot} is not a valid snapshot name"
|
|
|
|
# is this snapshot old?
|
|
base = snapshot.rsplit("-", 1)[0]
|
|
newest = snapshot
|
|
for s in snapshots:
|
|
if s.rsplit("-", 1)[0] != base:
|
|
continue
|
|
if s > newest:
|
|
newest = s
|
|
if newest != snapshot:
|
|
newer = f"{snapshot} has a newer version - {newest}"
|
|
|
|
return invalid, newer
|
|
|
|
|
|
# pylint: disable=too-many-branches
|
|
def check_snapshot_urls(urls, snapshots, skip=("test/data/assemblers", "test/data/manifests", "test/data/stages"),
|
|
errors_only=False):
|
|
"""check the urls against the current list of snapshots
|
|
|
|
Returns:
|
|
0 if all were valid and no newer snapshots are available
|
|
2 if there were invalid snapshots
|
|
3 if there were newer snapshots
|
|
6 if there were invalid and newer snapshots
|
|
"""
|
|
# Gather up the messages for each file
|
|
messages = {}
|
|
ret = 0
|
|
for url in urls:
|
|
invalid, newer = check_baseurl(url, snapshots)
|
|
if invalid:
|
|
# Add this to each file's invalid message list
|
|
for f in urls[url]:
|
|
if any(bool(s in f) for s in skip):
|
|
continue
|
|
ret |= 2
|
|
if f in messages:
|
|
if invalid not in messages[f]["invalid"]:
|
|
messages[f]["invalid"].append(invalid)
|
|
else:
|
|
messages[f] = {"invalid": [invalid], "newer": []}
|
|
|
|
if errors_only:
|
|
continue
|
|
|
|
if newer:
|
|
# Add this to each file's newer message list
|
|
for f in urls[url]:
|
|
if any(bool(s in f) for s in skip):
|
|
continue
|
|
ret |= 4
|
|
if f in messages:
|
|
if newer not in messages[f]["newer"]:
|
|
messages[f]["newer"].append(newer)
|
|
else:
|
|
messages[f] = {"newer": [newer], "invalid": []}
|
|
|
|
# Print the messages for each file
|
|
for f, msgs in messages.items():
|
|
print(f"{f}:")
|
|
for msg in msgs["invalid"]:
|
|
print(f" ERROR: {msg}")
|
|
for msg in msgs["newer"]:
|
|
print(f" NEWER: {msg}")
|
|
|
|
return ret
|
|
|
|
|
|
# parse cmdline args
|
|
def parse_args():
|
|
parser = argparse.ArgumentParser(description="Check snapshot urls")
|
|
parser.add_argument("--verbose")
|
|
parser.add_argument("--timeout", type=int, default=SNAPSHOTS_TIMEOUT,
|
|
help="How long to wait for rpmrepo snapshot list")
|
|
parser.add_argument("--cache", help="Use a cached file for the list of rpmrepo snapshots")
|
|
parser.add_argument("--url", default=SNAPSHOTS_URL,
|
|
help="URL to use for the list of rpmrepo snapshots")
|
|
parser.add_argument("--errors-only", action="store_true",
|
|
help="Only return errors")
|
|
parser.add_argument("directory")
|
|
return parser.parse_args()
|
|
|
|
|
|
def main():
|
|
args = parse_args()
|
|
urls = find_snapshot_urls(args.directory)
|
|
|
|
snapshots = None
|
|
if args.cache:
|
|
try:
|
|
with open(args.cache, encoding="utf8") as f:
|
|
snapshots = json.load(f)
|
|
except FileNotFoundError:
|
|
print(f"No snapshots cache found at {args.cache}")
|
|
sys.exit(1)
|
|
else:
|
|
snapshots = fetch_snapshots_api(args.url, args.timeout)
|
|
if not snapshots:
|
|
print(f"Cannot download snapshots from {args.url}")
|
|
sys.exit(1)
|
|
|
|
return check_snapshot_urls(urls, snapshots, errors_only=args.errors_only)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
sys.exit(main())
|