debian-koji/devtools/check-api
2024-06-10 09:02:56 +02:00

358 lines
10 KiB
Python
Executable file

#!/usr/bin/python
import inspect
import json
import os
import sys
import types
from collections import OrderedDict
# import koji code from our checkout
kojitop = os.path.dirname(os.path.dirname(__file__))
sys.path.insert(0, kojitop)
import koji
from kojihub import auth, kojixmlrpc, kojihub
import koji
import koji.arch
import koji.util
import koji.xmlrpcplus
import koji.tasks
import koji.plugin
from kojihub import kojixmlrpc
def main():
if len(sys.argv) == 1:
# just print the api
data = read_api()
print(json.dumps(data, indent=4))
elif len(sys.argv) == 2:
# read an api dump and compare to current
old = read_fn(sys.argv[1])
new = read_api()
compare(old, new)
report()
elif len(sys.argv) == 3:
# compare two api dumps (use '.' to read current)
old = read_fn(sys.argv[1])
new = read_fn(sys.argv[2])
compare(old, new)
report()
if ERRORS:
return 1
else:
return 0
def read_fn(fn):
if fn == '.':
return read_api()
else:
with open(fn, 'rt') as fo:
return json.load(fo)
def read_api():
data = OrderedDict()
# we use OrderedDicts so that our json output diffs nicely
data['version'] = list(koji.__version_info__)
lib_modules = (koji, koji.arch, koji.util, koji.tasks, koji.xmlrpcplus, koji.plugin)
data['lib'] = OrderedDict()
for mod in lib_modules:
data['lib'][mod.__name__] = dump_module(mod)
# hub rpc calls (no plugins)
registry = kojixmlrpc.get_registry(opts={}, plugins=None)
data['rpc'] = OrderedDict()
for name in sorted(registry.funcs):
func = registry.funcs[name]
data['rpc'][name] = dump_func(func)
return data
def dump_module(mod):
info = OrderedDict()
file = inspect.getsourcefile(mod)
names = [n for n in vars(mod) if not n.startswith('_')]
# TODO defer filtering _ to check code
names.sort()
for name in names:
value = getattr(mod, name)
vinfo = OrderedDict()
_type = str(type(value))
if '__future__' in _type:
continue
vinfo['type'] = str(type(value))
info[name] = vinfo
try:
if inspect.getsourcefile(value) != file:
# don't dig any deeper if it isn't defined in the module
vinfo['is_external'] = True
continue
except TypeError:
# getsourcefile fails for numerous types
pass
if inspect.isclass(value):
vinfo['is_class'] = True
vinfo.update(dump_class(value))
elif inspect.isfunction(value):
vinfo['is_function'] = True
vinfo.update(dump_func(value))
elif inspect.ismodule(value):
vinfo['is_module'] = True
return info
def dump_func(func):
info = OrderedDict()
if inspect.isbuiltin(func):
info['is_builtin'] = True
# no need to dig deeper
return info
if inspect.isgeneratorfunction(func):
info['is_generator_function'] = True
sig = inspect.signature(func)
info['desc'] = '(%s)' % ', '.join([str(x) for x in sig.parameters.values()])
args = []
for pname, param in sig.parameters.items():
if param.default == param.empty:
arg = OrderedDict(name=pname)
else:
# default could be all sorts of things, repr is better than json here
default = repr(param.default)
arg = OrderedDict(name=pname, default=default)
args.append(arg)
info['args'] = args
spec = inspect.getfullargspec(func)
# FullArgSpec(args, varargs, varkw, defaults, kwonlyargs, kwonlydefaults, annotations)
if spec.kwonlyargs or spec.kwonlydefaults:
# we don't use these
error(f'Found keyword-only args in rpc call {name}')
info['varargs'] = spec.varargs
info['varkw'] = spec.varkw
return info
def dump_class(cls):
members = OrderedDict()
names = [n for n in vars(cls) if not n.startswith('_')]
names.sort()
for name in names:
value = getattr(cls, name)
vinfo = OrderedDict()
_type = str(type(value))
vinfo['type'] = str(type(value))
if inspect.isfunction(value):
vinfo['is_function'] = True
vinfo.update(dump_func(value))
members[name] = vinfo
return {'members': members}
def compare(old, new):
top_keys = {'version', 'lib', 'rpc'}
if set(old) != top_keys:
error(f'Invalid keys in old data {list(old)}')
if set(new) != top_keys:
error(f'Invalid keys in new data {list(new)}')
compare_version(old['version'], new['version'])
compare_lib(old['lib'], new['lib'])
compare_rpc(old['rpc'], new['rpc'])
NEW_MAJOR = False
def compare_version(old, new):
global NEW_MAJOR
if old < new:
if old[:2] < new[:2]:
print(f'Major version increase {old} -> {new}')
NEW_MAJOR = True
else:
print(f'Minor version increase {old} -> {new}')
elif old > new:
error(f'Version DECREASED: {old} -> {new}')
def compare_lib(old, new):
names1 = set(old)
names2 = set(new)
added = names2 - names1
dropped = names1 - names2
both = names1.intersection(names2)
for name in sorted(added):
warn(f'Added module: {name}')
for name in sorted(dropped):
error(f'Dropped module: {name}')
for name in sorted(both):
compare_mod(name, old[name], new[name])
def compare_mod(mod, old, new):
names1 = set(old)
names2 = set(new)
added = names2 - names1
dropped = names1 - names2
both = names1.intersection(names2)
for name in sorted(added):
warn(f'Added module global: {mod}.{name}')
for name in sorted(dropped):
# TODO figure out a way to distinguish deprecations
error(f'Dropped module global: {mod}.{name}')
for name in sorted(both):
compare_mod_global(mod, name, old[name], new[name])
def compare_mod_global(mod, name, old, new):
if old['type'] != new['type']:
error(f'{mod}.{name} changed type: {old["type"]} -> {new["type"]}')
# this prevents further comparison
return
desc = f'{mod}.{name}'
if old.get('is_function'):
compare_function(desc, old, new)
elif old.get('is_class'):
compare_class(desc, old, new)
def compare_class(cls, old, new):
names1 = set(old['members'])
names2 = set(new['members'])
added = names2 - names1
dropped = names1 - names2
both = names1.intersection(names2)
for name in sorted(added):
warn(f'Added class variable: {cls}.{name}')
for name in sorted(dropped):
# TODO figure out a way to distinguish deprecations
error(f'Dropped class variable: {cls}.{name}')
for name in sorted(both):
compare_class_var(cls, name, old['members'][name], new['members'][name])
def compare_class_var(cls, name, old, new):
if old['type'] != new['type']:
error(f'{cls}.{name} changed type: {old["type"]} -> {new["type"]}')
# this prevents further comparison
return
desc = f'{cls}.{name}'
if old.get('is_function'):
compare_function(desc, old, new)
def compare_rpc(old, new):
names1 = set(old)
names2 = set(new)
added = names2 - names1
dropped = names1 - names2
both = names1.intersection(names2)
for name in sorted(added):
warn(f'Added RPC call: {name}')
for name in sorted(dropped):
# TODO figure out a way to distinguish deprecations
error(f'Dropped RPC call: {name}')
for name in sorted(both):
compare_rpc_call(name, old[name], new[name])
def compare_rpc_call(name, old, new):
desc = f'rpc call {name}'
compare_function(desc, old, new)
def compare_function(desc, old, new):
if old['desc'] != new['desc']:
warn(f'Signature changed for {desc}:\n old: {old["desc"]}\n new: {new["desc"]}')
# this is mainly for human readability, the code below will note errors
if not old['varargs']:
if new['varargs']:
warn(f'Added varargs for {desc}: {new["varargs"]}')
# not really a problem, but worth noting
elif not new['varargs']:
error(f'Dropped varargs for {desc}')
elif old['varargs'] != new['varargs']:
# not really a problem
warn(f'Renamed varargs for {desc}: {old["varargs"]} -> {new["varargs"]}')
if not old['varkw']:
if new['varkw']:
warn(f'Added varkw for {desc}: {new["varkw"]}')
# not really a problem, but worth noting
elif not new['varkw']:
error(f'Dropped varkw for {desc}')
elif old['varkw'] != new['varkw']:
# not really a problem
warn(f'Renamed varkw for {desc}: {old["varkw"]} -> {new["varkw"]}')
oargs = old['args']
nargs = new['args']
# arg counts
if len(nargs) < len(oargs):
error(f'Arg count reduced for {desc}')
# this will break rest of code, so we stop checking here
return
# note extended args
added = [a['name'] for a in nargs[len(oargs):]]
if added:
warn(f'Args extended for {desc}: {added!r}')
# details for overlap
for i, (oarg, narg) in enumerate(zip(oargs, nargs)):
if oarg['name'] != narg['name']:
error(f'Overlapping args do not match for {desc} arg {i}')
# this breaks rest of checks
return
if 'default' in oarg:
if 'default' not in narg:
error(f'Dropped default value for {desc} arg {oarg["name"]}')
elif narg['default'] != oarg['default']:
error(f'Changed default value for {desc} arg {oarg["name"]}: '
f'{oarg["default"]} -> {narg["default"]}')
elif 'default' in narg:
warn(f'Added default value for {desc} arg {oarg["name"]}')
WARNINGS = []
ERRORS = []
def warn(msg):
WARNINGS.append(msg)
sys.stderr.write(msg)
sys.stderr.write('\n')
def error(msg):
ERRORS.append(msg)
sys.stderr.write('ERROR: ')
sys.stderr.write(msg)
sys.stderr.write('\n')
def report():
print(f'Got {len(WARNINGS)} warnings and {len(ERRORS)} errors')
if __name__ == '__main__':
rv = main()
sys.exit(rv)