#!/usr/bin/env python3 import inspect import json import os import os.path import subprocess import sys from collections import OrderedDict # import koji code from our checkout if os.path.exists(__file__): kojitop = os.path.dirname(os.path.dirname(__file__)) else: # e.g. kojitop = os.getcwd() sys.path.insert(0, kojitop) setup = kojitop + '/setup.py' try: proc = subprocess.Popen([sys.executable, setup, '--version'], stdout=subprocess.PIPE) (out, _) = proc.communicate() VERSTR = out.decode().strip() VERSION = tuple([int(x) for x in VERSTR.split('.')]) # we use this version to adapt our api scan a bit, but we record # the version from koji lib in the api data except Exception: VERSTR = '0.0.0' VERSION = (0, 0, 0) import koji import koji.arch import koji.plugin import koji.tasks import koji.util import koji.xmlrpcplus if VERSION >= (1, 32, 0): from kojihub import kojixmlrpc else: from hub import kojixmlrpc kojixmlrpc.load_scripts({'SCRIPT_FILENAME': kojixmlrpc.__file__}) def main(): if len(sys.argv) == 1: # just print the api data = read_api() print(json.dumps(data, indent=4)) elif len(sys.argv) == 2: # read an api dump and compare to current old = read_fn(sys.argv[1]) new = read_api() compare(old, new) report() elif len(sys.argv) == 3: # compare two api dumps (use '.' to read current) old = read_fn(sys.argv[1]) new = read_fn(sys.argv[2]) compare(old, new) report() if ERRORS: return 1 else: return 0 def read_fn(fn): if fn == '.': return read_api() else: with open(fn, 'rt') as fo: return json.load(fo) def read_api(): data = OrderedDict() # we use OrderedDicts so that our json output diffs nicely data['version'] = list(getattr(koji, '__version_info__', (0, 0, 0))) lib_modules = (koji, koji.arch, koji.util, koji.tasks, koji.xmlrpcplus, koji.plugin) data['lib'] = OrderedDict() for mod in lib_modules: data['lib'][mod.__name__] = dump_module(mod) # hub rpc calls (no plugins) registry = kojixmlrpc.get_registry(opts={}, plugins=None) data['rpc'] = OrderedDict() for name in sorted(registry.funcs): func = registry.funcs[name] data['rpc'][name] = dump_func(func) return data def dump_module(mod): info = OrderedDict() file = inspect.getsourcefile(mod) names = [n for n in vars(mod) if not n.startswith('_')] # TODO defer filtering _ to check code names.sort() for name in names: value = getattr(mod, name) vinfo = OrderedDict() _type = str(type(value)) if '__future__' in _type: continue vinfo['type'] = str(type(value)) info[name] = vinfo if inspect.ismodule(value): vinfo['is_module'] = True continue try: if inspect.getsourcefile(value) != file: # don't dig any deeper if it isn't defined in the module vinfo['is_external'] = True continue except TypeError: # getsourcefile fails for numerous types pass if inspect.isclass(value): vinfo['is_class'] = True vinfo.update(dump_class(value)) elif inspect.isfunction(value): vinfo['is_function'] = True vinfo.update(dump_func(value)) return info def dump_func(func): info = OrderedDict() name = func.__name__ if inspect.isbuiltin(func): info['is_builtin'] = True # no need to dig deeper return info if inspect.isgeneratorfunction(func): info['is_generator_function'] = True sig = inspect.signature(func) info['desc'] = '(%s)' % ', '.join([str(x) for x in sig.parameters.values()]) args = [] for pname, param in sig.parameters.items(): if param.default == param.empty: arg = OrderedDict(name=pname) else: # default could be all sorts of things, repr is better than json here default = repr(param.default) arg = OrderedDict(name=pname, default=default) args.append(arg) info['args'] = args spec = inspect.getfullargspec(func) # FullArgSpec(args, varargs, varkw, defaults, kwonlyargs, kwonlydefaults, annotations) if spec.kwonlyargs or spec.kwonlydefaults: # we don't use these error(f'Found keyword-only args in rpc call {name}') info['varargs'] = spec.varargs info['varkw'] = spec.varkw return info def dump_class(cls): members = OrderedDict() names = [n for n in vars(cls) if not n.startswith('_')] names.sort() for name in names: value = getattr(cls, name) vinfo = OrderedDict() vinfo['type'] = str(type(value)) if inspect.isfunction(value): vinfo['is_function'] = True vinfo.update(dump_func(value)) members[name] = vinfo return {'members': members} def compare(old, new): top_keys = {'version', 'lib', 'rpc'} if set(old) != top_keys: error(f'Invalid keys in old data {list(old)}') if set(new) != top_keys: error(f'Invalid keys in new data {list(new)}') compare_version(old['version'], new['version']) compare_lib(old['lib'], new['lib']) compare_rpc(old['rpc'], new['rpc']) NEW_MAJOR = False def compare_version(old, new): global NEW_MAJOR if old < new: if old[:2] < new[:2]: print(f'Major version increase {old} -> {new}') NEW_MAJOR = True else: print(f'Minor version increase {old} -> {new}') elif old > new: error(f'Version DECREASED: {old} -> {new}') def compare_lib(old, new): names1 = set(old) names2 = set(new) added = names2 - names1 dropped = names1 - names2 both = names1.intersection(names2) for name in sorted(added): warn(f'Added module: {name}') for name in sorted(dropped): error(f'Dropped module: {name}') for name in sorted(both): compare_mod(name, old[name], new[name]) def compare_mod(mod, old, new): names1 = set(old) names2 = set(new) added = names2 - names1 dropped = names1 - names2 both = names1.intersection(names2) for name in sorted(added): warn(f'Added module global: {mod}.{name}') for name in sorted(dropped): if old[name].get('is_module'): error(f'Dropped module import {mod}.{name}') elif old[name].get('is_external'): error(f'Dropped imported global {mod}.{name}') else: # TODO figure out a way to distinguish deprecations error(f'Dropped module global: {mod}.{name}') for name in sorted(both): compare_mod_global(mod, name, old[name], new[name]) def compare_mod_global(mod, name, old, new): if old['type'] != new['type']: if old.get('is_external'): # not from our code, so just warn warn(f'{mod}.{name} (external) changed type: {old["type"]} -> {new["type"]}') else: error(f'{mod}.{name} changed type: {old["type"]} -> {new["type"]}') # this prevents further comparison return desc = f'{mod}.{name}' if old.get('is_function'): compare_function(desc, old, new) elif old.get('is_class'): compare_class(desc, old, new) def compare_class(cls, old, new): names1 = set(old['members']) names2 = set(new['members']) added = names2 - names1 dropped = names1 - names2 both = names1.intersection(names2) for name in sorted(added): warn(f'Added class variable: {cls}.{name}') for name in sorted(dropped): # TODO figure out a way to distinguish deprecations error(f'Dropped class variable: {cls}.{name}') for name in sorted(both): compare_class_var(cls, name, old['members'][name], new['members'][name]) def compare_class_var(cls, name, old, new): if old['type'] != new['type']: error(f'{cls}.{name} changed type: {old["type"]} -> {new["type"]}') # this prevents further comparison return desc = f'{cls}.{name}' if old.get('is_function'): compare_function(desc, old, new) def compare_rpc(old, new): names1 = set(old) names2 = set(new) added = names2 - names1 dropped = names1 - names2 both = names1.intersection(names2) for name in sorted(added): warn(f'Added RPC call: {name}') for name in sorted(dropped): # TODO figure out a way to distinguish deprecations error(f'Dropped RPC call: {name}') for name in sorted(both): compare_rpc_call(name, old[name], new[name]) def compare_rpc_call(name, old, new): desc = f'rpc call {name}' compare_function(desc, old, new) def compare_function(desc, old, new): if old['desc'] != new['desc']: warn(f'Signature changed for {desc}:\n old: {old["desc"]}\n new: {new["desc"]}') # this is mainly for human readability, the code below will note errors if not old['varargs']: if new['varargs']: warn(f'Added varargs for {desc}: {new["varargs"]}') # not really a problem, but worth noting elif not new['varargs']: error(f'Dropped varargs for {desc}') elif old['varargs'] != new['varargs']: # not really a problem warn(f'Renamed varargs for {desc}: {old["varargs"]} -> {new["varargs"]}') if not old['varkw']: if new['varkw']: warn(f'Added varkw for {desc}: {new["varkw"]}') # not really a problem, but worth noting elif not new['varkw']: error(f'Dropped varkw for {desc}') elif old['varkw'] != new['varkw']: # not really a problem warn(f'Renamed varkw for {desc}: {old["varkw"]} -> {new["varkw"]}') oargs = old['args'] nargs = new['args'] # arg counts if len(nargs) < len(oargs): error(f'Arg count reduced for {desc}') # this will break rest of code, so we stop checking here return # note extended args added = [a['name'] for a in nargs[len(oargs):]] if added: warn(f'Args extended for {desc}: {added!r}') # details for overlap for i, (oarg, narg) in enumerate(zip(oargs, nargs)): if oarg['name'] != narg['name']: error(f'Overlapping args do not match for {desc} arg {i}') # this breaks rest of checks return if 'default' in oarg: if 'default' not in narg: error(f'Dropped default value for {desc} arg {oarg["name"]}') elif narg['default'] != oarg['default']: error(f'Changed default value for {desc} arg {oarg["name"]}: ' f'{oarg["default"]} -> {narg["default"]}') elif 'default' in narg: warn(f'Added default value for {desc} arg {oarg["name"]}') WARNINGS = [] ERRORS = [] def warn(msg): WARNINGS.append(msg) sys.stderr.write(msg) sys.stderr.write('\n') def error(msg): ERRORS.append(msg) sys.stderr.write('ERROR: ') sys.stderr.write(msg) sys.stderr.write('\n') def report(): print(f'Got {len(WARNINGS)} warnings and {len(ERRORS)} errors') if __name__ == '__main__': rv = main() sys.exit(rv) # the end