tests: Add a check for valid snapshot urls
This pulls the list of snapshots from the rpmrepo API, greps the codebase for all uses of rpmrepo.osbuild.org that look like a snapshot name, and then checks to make sure they are still valid.
This commit is contained in:
parent
967570a2a6
commit
8ff4c0c40a
2 changed files with 191 additions and 0 deletions
11
.github/workflows/tests.yml
vendored
11
.github/workflows/tests.yml
vendored
|
|
@ -159,6 +159,17 @@ jobs:
|
|||
exit "0"
|
||||
fi
|
||||
|
||||
snapshots:
|
||||
name: "🔍 Check for valid snapshot urls"
|
||||
runs-on: ubuntu-20.04
|
||||
steps:
|
||||
- name: Check out code into the Go module directory
|
||||
uses: actions/checkout@v3
|
||||
with:
|
||||
ref: ${{ github.event.pull_request.head.sha }}
|
||||
- name: Check for valid snapshot urls
|
||||
run: ./tools/check-snapshots --errors-only .
|
||||
|
||||
shellcheck:
|
||||
name: "🐚 Shellcheck"
|
||||
runs-on: ubuntu-20.04
|
||||
|
|
|
|||
180
tools/check-snapshots
Executable file
180
tools/check-snapshots
Executable file
|
|
@ -0,0 +1,180 @@
|
|||
#!/usr/bin/python3
|
||||
"""check-snapshots greps the directory tree for rpmrepo urls and checks them
|
||||
against the current snapshot list"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import subprocess
|
||||
import time
|
||||
from urllib.parse import urlparse
|
||||
|
||||
import requests
|
||||
|
||||
SNAPSHOTS_URL="https://rpmrepo.osbuild.org/v2/enumerate"
|
||||
SNAPSHOTS_TIMEOUT = 2 * 60
|
||||
SNAPSHOT_GREP = ["grep", "--color=never", "-or", r"http.*rpmrepo.osbuild.org.*-20[0-9]\+"]
|
||||
|
||||
def fetch_snapshots_api(url, timeout=SNAPSHOTS_TIMEOUT):
|
||||
"""Get the list of snapshots from the rpmrepo API"""
|
||||
print(f"Fetching list of snapshots from {url}")
|
||||
start = time.time()
|
||||
try:
|
||||
r = requests.get(url, timeout=timeout)
|
||||
except:
|
||||
return None
|
||||
elapsed = time.time() - start
|
||||
if r.status_code != 200:
|
||||
print(f"HTTP Response {r.status_code} from {url} after {elapsed:0.0f}s: {r.text}")
|
||||
return None
|
||||
print(f"Received snapshot list in {elapsed:0.0f}s")
|
||||
return r.json()
|
||||
|
||||
|
||||
def find_snapshot_urls(directory):
|
||||
"""grep the directory for rpmrepo snapshot urls
|
||||
|
||||
Returns a map of urls to the files they are used in.
|
||||
"""
|
||||
urls = {}
|
||||
try:
|
||||
grep_out = subprocess.run(SNAPSHOT_GREP + [directory],
|
||||
check=True,
|
||||
capture_output=True,
|
||||
env={"LANG": "C"})
|
||||
except subprocess.CalledProcessError as e:
|
||||
print("ERROR: " + e.stderr.decode("utf-8"))
|
||||
sys.exit(1)
|
||||
|
||||
for line in grep_out.stdout.decode("utf-8").splitlines():
|
||||
try:
|
||||
file, url = line.split(":", 1)
|
||||
except ValueError:
|
||||
print(f"Problem parsing {line}")
|
||||
continue
|
||||
url = url.strip()
|
||||
if url not in urls:
|
||||
urls[url] = [file]
|
||||
else:
|
||||
urls[url].append(file)
|
||||
|
||||
return urls
|
||||
|
||||
|
||||
def check_baseurl(repo, snapshots):
|
||||
"""Check the baseurl to see if it is a valid snapshot, and if there is a newer one
|
||||
available.
|
||||
"""
|
||||
invalid = None
|
||||
newer = None
|
||||
url = urlparse(repo)
|
||||
snapshot = os.path.basename(url.path)
|
||||
|
||||
# Is this snapshot valid?
|
||||
if snapshot not in snapshots:
|
||||
invalid = f"{snapshot} is not a valid snapshot name"
|
||||
|
||||
# is this snapshot old?
|
||||
base = snapshot.rsplit("-", 1)[0]
|
||||
newest = snapshot
|
||||
for s in snapshots:
|
||||
if s.rsplit("-", 1)[0] != base:
|
||||
continue
|
||||
if s > newest:
|
||||
newest = s
|
||||
if newest != snapshot:
|
||||
newer = f"{snapshot} has a newer version - {newest}"
|
||||
|
||||
return invalid, newer
|
||||
|
||||
|
||||
def check_snapshot_urls(urls, snapshots, skip=["test/data/manifests"], errors_only=False):
|
||||
"""check the urls against the current list of snapshots
|
||||
|
||||
Returns:
|
||||
0 if all were valid and no newer snapshots are available
|
||||
2 if there were invalid snapshots
|
||||
3 if there were newer snapshots
|
||||
6 if there were invalid and newer snapshots
|
||||
"""
|
||||
# Gather up the messages for each file
|
||||
messages = {}
|
||||
ret = 0
|
||||
for url in urls:
|
||||
invalid, newer = check_baseurl(url, snapshots)
|
||||
if invalid:
|
||||
# Add this to each file's invalid message list
|
||||
for f in urls[url]:
|
||||
if any(bool(s in f) for s in skip):
|
||||
continue
|
||||
ret |= 2
|
||||
if f in messages:
|
||||
if invalid not in messages[f]["invalid"]:
|
||||
messages[f]["invalid"].append(invalid)
|
||||
else:
|
||||
messages[f] = {"invalid": [invalid], "newer": []}
|
||||
|
||||
if errors_only:
|
||||
continue
|
||||
|
||||
if newer:
|
||||
# Add this to each file's newer message list
|
||||
for f in urls[url]:
|
||||
if any(bool(s in f) for s in skip):
|
||||
continue
|
||||
ret |= 4
|
||||
if f in messages:
|
||||
if newer not in messages[f]["newer"]:
|
||||
messages[f]["newer"].append(newer)
|
||||
else:
|
||||
messages[f] = {"newer": [newer], "invalid": []}
|
||||
|
||||
# Print the messages for each file
|
||||
for f in messages:
|
||||
print(f"{f}:")
|
||||
for msg in messages[f]["invalid"]:
|
||||
print(f" ERROR: {msg}")
|
||||
for msg in messages[f]["newer"]:
|
||||
print(f" NEWER: {msg}")
|
||||
|
||||
return ret
|
||||
|
||||
|
||||
# parse cmdline args
|
||||
def parse_args():
|
||||
parser =argparse.ArgumentParser(description="Check snapshot urls")
|
||||
parser.add_argument("--verbose")
|
||||
parser.add_argument("--timeout", type=int, default=SNAPSHOTS_TIMEOUT,
|
||||
help="How long to wait for rpmrepo snapshot list")
|
||||
parser.add_argument("--cache", help="Use a cached file for the list of rpmrepo snapshots")
|
||||
parser.add_argument("--url", default=SNAPSHOTS_URL,
|
||||
help="URL to use for the list of rpmrepo snapshots")
|
||||
parser.add_argument("--errors-only", action="store_true",
|
||||
help="Only return errors")
|
||||
parser.add_argument("directory")
|
||||
return parser.parse_args()
|
||||
|
||||
def main():
|
||||
args = parse_args()
|
||||
urls = find_snapshot_urls(args.directory)
|
||||
|
||||
snapshots = None
|
||||
if args.cache:
|
||||
try:
|
||||
with open(args.cache, encoding="utf8") as f:
|
||||
snapshots = json.load(f)
|
||||
except:
|
||||
print(f"No snapshots cache found at {args.cache}")
|
||||
sys.exit(1)
|
||||
else:
|
||||
snapshots = fetch_snapshots_api(args.url, args.timeout)
|
||||
if not snapshots:
|
||||
print(f"Cannot download snapshots from {args.url}")
|
||||
sys.exit(1)
|
||||
|
||||
return check_snapshot_urls(urls, snapshots, errors_only=args.errors_only)
|
||||
|
||||
|
||||
if __name__=='__main__':
|
||||
sys.exit(main())
|
||||
Loading…
Add table
Add a link
Reference in a new issue