#!/usr/bin/python import inspect import json import os import sys import types from collections import OrderedDict # import koji code from our checkout kojitop = os.path.dirname(os.path.dirname(__file__)) sys.path.insert(0, kojitop) import koji from kojihub import auth, kojixmlrpc, kojihub import koji import koji.arch import koji.util import koji.xmlrpcplus import koji.tasks import koji.plugin from kojihub import kojixmlrpc def main(): if len(sys.argv) == 1: # just print the api data = read_api() print(json.dumps(data, indent=4)) elif len(sys.argv) == 2: # read an api dump and compare to current old = read_fn(sys.argv[1]) new = read_api() compare(old, new) report() elif len(sys.argv) == 3: # compare two api dumps (use '.' to read current) old = read_fn(sys.argv[1]) new = read_fn(sys.argv[2]) compare(old, new) report() if ERRORS: return 1 else: return 0 def read_fn(fn): if fn == '.': return read_api() else: with open(fn, 'rt') as fo: return json.load(fo) def read_api(): data = OrderedDict() # we use OrderedDicts so that our json output diffs nicely data['version'] = list(koji.__version_info__) lib_modules = (koji, koji.arch, koji.util, koji.tasks, koji.xmlrpcplus, koji.plugin) data['lib'] = OrderedDict() for mod in lib_modules: info = data['lib'][mod.__name__] = OrderedDict() names = [n for n in vars(mod) if not n.startswith('_')] names.sort() for name in names: value = getattr(mod, name) vinfo = OrderedDict() _type = str(type(value)) if '__future__' in _type: continue vinfo['type'] = str(type(value)) if isinstance(value, types.FunctionType): vinfo.update(dump_func(value)) info[name] = vinfo # hub rpc calls (no plugins) registry = kojixmlrpc.get_registry(opts={}, plugins=None) data['rpc'] = OrderedDict() for name in sorted(registry.funcs): func = registry.funcs[name] data['rpc'][name] = dump_func(func) return data def dump_func(func): info = OrderedDict() sig = inspect.signature(func) info['desc'] = '(%s)' % ', '.join([str(x) for x in sig.parameters.values()]) args = [] for pname, param in sig.parameters.items(): if param.default == param.empty: arg = OrderedDict(name=pname) else: # default could be all sorts of things, repr is better than json here default = repr(param.default) arg = OrderedDict(name=pname, default=default) args.append(arg) info['args'] = args spec = inspect.getfullargspec(func) # FullArgSpec(args, varargs, varkw, defaults, kwonlyargs, kwonlydefaults, annotations) if spec.kwonlyargs or spec.kwonlydefaults: # we don't use these error(f'Found keyword-only args in rpc call {name}') info['varargs'] = spec.varargs info['varkw'] = spec.varkw return info def compare(old, new): top_keys = {'version', 'lib', 'rpc'} if set(old) != top_keys: error(f'Invalid keys in old data {list(old)}') if set(new) != top_keys: error(f'Invalid keys in new data {list(new)}') compare_version(old['version'], new['version']) compare_lib(old['lib'], new['lib']) compare_rpc(old['rpc'], new['rpc']) NEW_MAJOR = False def compare_version(old, new): global NEW_MAJOR if old < new: if old[:2] < new[:2]: print(f'Major version increase {old} -> {new}') NEW_MAJOR = True else: print(f'Minor version increase {old} -> {new}') elif old > new: error(f'Version DECREASED: {old} -> {new}') def compare_lib(old, new): names1 = set(old) names2 = set(new) added = names2 - names1 dropped = names1 - names2 both = names1.intersection(names2) for name in sorted(added): warn(f'Added module: {name}') for name in sorted(dropped): error(f'Dropped module: {name}') for name in sorted(both): compare_mod(name, old[name], new[name]) def compare_mod(mod, old, new): names1 = set(old) names2 = set(new) added = names2 - names1 dropped = names1 - names2 both = names1.intersection(names2) for name in sorted(added): warn(f'Added module global: {mod}.{name}') for name in sorted(dropped): # TODO figure out a way to distinguish deprecations error(f'Dropped module global: {mod}.{name}') for name in sorted(both): compare_mod_global(mod, name, old[name], new[name]) def compare_mod_global(mod, name, old, new): if old['type'] != new['type']: error(f'{mod}.{name} changed type: {old["type"]} -> {new["type"]}') # this prevents further comparison return desc = f'{mod}.{name}' if 'args' in old: compare_function(desc, old, new) def compare_rpc(old, new): names1 = set(old) names2 = set(new) added = names2 - names1 dropped = names1 - names2 both = names1.intersection(names2) for name in sorted(added): warn(f'Added RPC call: {name}') for name in sorted(dropped): # TODO figure out a way to distinguish deprecations error(f'Dropped RPC call: {name}') for name in sorted(both): compare_rpc_call(name, old[name], new[name]) def compare_rpc_call(name, old, new): desc = f'rpc call {name}' compare_function(desc, old, new) def compare_function(desc, old, new): if old['desc'] != new['desc']: warn(f'Signature changed for {desc}:\n old: {old["desc"]}\n new: {new["desc"]}') # this is mainly for human readability, the code below will note errors if not old['varargs']: if new['varargs']: warn(f'Added varargs for {desc}: {new["varargs"]}') # not really a problem, but worth noting elif not new['varargs']: error(f'Dropped varargs for {desc}') elif old['varargs'] != new['varargs']: # not really a problem warn(f'Renamed varargs for {desc}: {old["varargs"]} -> {new["varargs"]}') if not old['varkw']: if new['varkw']: warn(f'Added varkw for {desc}: {new["varkw"]}') # not really a problem, but worth noting elif not new['varkw']: error(f'Dropped varkw for {desc}') elif old['varkw'] != new['varkw']: # not really a problem warn(f'Renamed varkw for {desc}: {old["varkw"]} -> {new["varkw"]}') oargs = old['args'] nargs = new['args'] # arg counts if len(nargs) < len(oargs): error(f'Arg count reduced for {desc}') # this will break rest of code, so we stop checking here return # note extended args added = [a['name'] for a in nargs[len(oargs):]] if added: warn(f'Args extended for {desc}: {added!r}') # details for overlap for i, (oarg, narg) in enumerate(zip(oargs, nargs)): if oarg['name'] != narg['name']: error(f'Overlapping args do not match for {desc} arg {i}') # this breaks rest of checks return if 'default' in oarg: if 'default' not in narg: error(f'Dropped default value for {desc} arg {oarg["name"]}') elif narg['default'] != oarg['default']: error(f'Changed default value for {desc} arg {oarg["name"]}: ' f'{oarg["default"]} -> {narg["default"]}') elif 'default' in narg: warn(f'Added default value for {desc} arg {oarg["name"]}') WARNINGS = [] ERRORS = [] def warn(msg): WARNINGS.append(msg) sys.stderr.write(msg) sys.stderr.write('\n') def error(msg): ERRORS.append(msg) sys.stderr.write('ERROR: ') sys.stderr.write(msg) sys.stderr.write('\n') def report(): print(f'Got {len(WARNINGS)} warnings and {len(ERRORS)} errors') if __name__ == '__main__': rv = main() sys.exit(rv)