When a requests.get() fails the exact nature of the error is currently discarded. This commit adds a small print() to show what exactly went wrong.
185 lines
5.8 KiB
Python
Executable file
185 lines
5.8 KiB
Python
Executable file
#!/usr/bin/python3
|
|
"""check-snapshots greps the directory tree for rpmrepo urls and checks them
|
|
against the current snapshot list"""
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
from urllib.parse import urlparse
|
|
|
|
import requests
|
|
|
|
SNAPSHOTS_URL = "https://rpmrepo.osbuild.org/v2/enumerate"
|
|
SNAPSHOTS_TIMEOUT = 2 * 60
|
|
SNAPSHOT_GREP = ["grep", "--color=never", "-or", r"http.*rpmrepo.osbuild.org.*-20[0-9]\+"]
|
|
|
|
|
|
def fetch_snapshots_api(url, timeout=SNAPSHOTS_TIMEOUT):
|
|
"""Get the list of snapshots from the rpmrepo API"""
|
|
print(f"Fetching list of snapshots from {url}")
|
|
start = time.time()
|
|
try:
|
|
r = requests.get(url, timeout=timeout)
|
|
except BaseException as e: # pylint: disable=broad-exception-caught
|
|
print(f"Problem downloading {url}: {e}")
|
|
return None
|
|
elapsed = time.time() - start
|
|
if r.status_code != 200:
|
|
print(f"HTTP Response {r.status_code} from {url} after {elapsed:0.0f}s: {r.text}")
|
|
return None
|
|
print(f"Received snapshot list in {elapsed:0.0f}s")
|
|
return r.json()
|
|
|
|
|
|
def find_snapshot_urls(directory):
|
|
"""grep the directory for rpmrepo snapshot urls
|
|
|
|
Returns a map of urls to the files they are used in.
|
|
"""
|
|
urls = {}
|
|
try:
|
|
grep_out = subprocess.run(SNAPSHOT_GREP + [directory],
|
|
check=True,
|
|
capture_output=True,
|
|
env={"LANG": "C"})
|
|
except subprocess.CalledProcessError as e:
|
|
print("ERROR: " + e.stderr.decode("utf-8"))
|
|
sys.exit(1)
|
|
|
|
for line in grep_out.stdout.decode("utf-8").splitlines():
|
|
try:
|
|
file, url = line.split(":", 1)
|
|
except ValueError:
|
|
print(f"Problem parsing {line}")
|
|
continue
|
|
url = url.strip()
|
|
if url not in urls:
|
|
urls[url] = [file]
|
|
else:
|
|
urls[url].append(file)
|
|
|
|
return urls
|
|
|
|
|
|
def check_baseurl(repo, snapshots):
|
|
"""Check the baseurl to see if it is a valid snapshot, and if there is a newer one
|
|
available.
|
|
"""
|
|
invalid = None
|
|
newer = None
|
|
url = urlparse(repo)
|
|
snapshot = os.path.basename(url.path)
|
|
|
|
# Is this snapshot valid?
|
|
if snapshot not in snapshots:
|
|
invalid = f"{snapshot} is not a valid snapshot name"
|
|
|
|
# is this snapshot old?
|
|
base = snapshot.rsplit("-", 1)[0]
|
|
newest = snapshot
|
|
for s in snapshots:
|
|
if s.rsplit("-", 1)[0] != base:
|
|
continue
|
|
if s > newest:
|
|
newest = s
|
|
if newest != snapshot:
|
|
newer = f"{snapshot} has a newer version - {newest}"
|
|
|
|
return invalid, newer
|
|
|
|
|
|
# pylint: disable=too-many-branches
|
|
def check_snapshot_urls(urls, snapshots, skip=("test/data/assemblers", "test/data/manifests", "test/data/stages"),
|
|
errors_only=False):
|
|
"""check the urls against the current list of snapshots
|
|
|
|
Returns:
|
|
0 if all were valid and no newer snapshots are available
|
|
2 if there were invalid snapshots
|
|
3 if there were newer snapshots
|
|
6 if there were invalid and newer snapshots
|
|
"""
|
|
# Gather up the messages for each file
|
|
messages = {}
|
|
ret = 0
|
|
for url in urls:
|
|
invalid, newer = check_baseurl(url, snapshots)
|
|
if invalid:
|
|
# Add this to each file's invalid message list
|
|
for f in urls[url]:
|
|
if any(bool(s in f) for s in skip):
|
|
continue
|
|
ret |= 2
|
|
if f in messages:
|
|
if invalid not in messages[f]["invalid"]:
|
|
messages[f]["invalid"].append(invalid)
|
|
else:
|
|
messages[f] = {"invalid": [invalid], "newer": []}
|
|
|
|
if errors_only:
|
|
continue
|
|
|
|
if newer:
|
|
# Add this to each file's newer message list
|
|
for f in urls[url]:
|
|
if any(bool(s in f) for s in skip):
|
|
continue
|
|
ret |= 4
|
|
if f in messages:
|
|
if newer not in messages[f]["newer"]:
|
|
messages[f]["newer"].append(newer)
|
|
else:
|
|
messages[f] = {"newer": [newer], "invalid": []}
|
|
|
|
# Print the messages for each file
|
|
for f, msgs in messages.items():
|
|
print(f"{f}:")
|
|
for msg in msgs["invalid"]:
|
|
print(f" ERROR: {msg}")
|
|
for msg in msgs["newer"]:
|
|
print(f" NEWER: {msg}")
|
|
|
|
return ret
|
|
|
|
|
|
# parse cmdline args
|
|
def parse_args():
|
|
parser = argparse.ArgumentParser(description="Check snapshot urls")
|
|
parser.add_argument("--verbose")
|
|
parser.add_argument("--timeout", type=int, default=SNAPSHOTS_TIMEOUT,
|
|
help="How long to wait for rpmrepo snapshot list")
|
|
parser.add_argument("--cache", help="Use a cached file for the list of rpmrepo snapshots")
|
|
parser.add_argument("--url", default=SNAPSHOTS_URL,
|
|
help="URL to use for the list of rpmrepo snapshots")
|
|
parser.add_argument("--errors-only", action="store_true",
|
|
help="Only return errors")
|
|
parser.add_argument("directory")
|
|
return parser.parse_args()
|
|
|
|
|
|
def main():
|
|
args = parse_args()
|
|
urls = find_snapshot_urls(args.directory)
|
|
|
|
snapshots = None
|
|
if args.cache:
|
|
try:
|
|
with open(args.cache, encoding="utf8") as f:
|
|
snapshots = json.load(f)
|
|
except FileNotFoundError:
|
|
print(f"No snapshots cache found at {args.cache}")
|
|
sys.exit(1)
|
|
else:
|
|
snapshots = fetch_snapshots_api(args.url, args.timeout)
|
|
if not snapshots:
|
|
print(f"Cannot download snapshots from {args.url}")
|
|
sys.exit(1)
|
|
|
|
return check_snapshot_urls(urls, snapshots, errors_only=args.errors_only)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
sys.exit(main())
|