#!/usr/bin/python3 import sys import configparser import datetime import optparse import os import koji def error(msg=None, code=1): if msg: msg = "ERROR: %s\n" % msg sys.stderr.write(msg) sys.stderr.flush() sys.exit(code) def warn(msg): msg = "WARNING: %s\n" % msg sys.stderr.write(msg) sys.stderr.flush() def get_options(): """process options from command line and config file""" parser = optparse.OptionParser(usage="%prog [options]") parser.add_option("-c", "--config", metavar="FILE", help="use alternate config file") parser.add_option("-s", "--server", help="url of koji XMLRPC server") parser.add_option("--keytab", help="specify a Kerberos keytab to use") parser.add_option("--principal", help="specify a Kerberos principal to use") parser.add_option("--runas", metavar="USER", help="run as the specified user (requires special privileges)") parser.add_option("--user", help="specify user") parser.add_option("--password", help="specify password") parser.add_option("--noauth", action="store_true", default=False, help="do not authenticate") parser.add_option("--cert", help="Client SSL certificate file for authentication") parser.add_option("--serverca", help="CA cert file that issued the hub certificate") parser.add_option("-d", "--debug", action="store_true", default=False, help="show debug output") parser.add_option("--debug-xmlrpc", action="store_true", default=False, help="show xmlrpc debug output") parser.add_option("-t", "--test", action="store_true", help="test mode, no tag is deleted") parser.add_option("--no-empty", action="store_false", dest="clean_empty", default=True, help="don't run emptiness check") parser.add_option("--empty-delay", action="store", metavar="DAYS", default=1, type=int, help="delete empty tags older than DAYS") parser.add_option("--no-old", action="store_false", dest="clean_old", default=True, help="don't run old check") parser.add_option("--old-delay", action="store", metavar="DAYS", default=30, type=int, help="delete older tags than timestamp") parser.add_option("--ignore-tags", metavar="PATTERN", action="append", help="Ignore tags matching PATTERN when pruning") parser.add_option("--no-inactive", action="store_false", dest="clean_inactive", default=True, help="don't run inactive check") parser.add_option("--inactive-delay", action="store", metavar="DAYS", default=10, type=int, help="delete tags inactive for DAYS (no build was un/tagged there)") # parse once to get the config file (options, args) = parser.parse_args() defaults = parser.get_default_values() config = configparser.ConfigParser() cf = getattr(options, 'config', None) if cf: if not os.access(cf, os.F_OK): parser.error("No such file: %s" % cf) assert False # pragma: no cover else: cf = '/etc/koji-gc/koji-gc.conf' if not os.access(cf, os.F_OK): cf = None if not cf: print("no config file") config = None else: config.read(cf) # List of values read from config file to update default parser values cfgmap = [ # name, alias, type ['keytab', None, 'string'], ['principal', None, 'string'], ['runas', None, 'string'], ['user', None, 'string'], ['password', None, 'string'], ['noauth', None, 'boolean'], ['cert', None, 'string'], ['serverca', None, 'string'], ['server', None, 'string'], ['no_ssl_verify', None, 'boolean'], ] for name, alias, type in cfgmap: if alias is None: alias = ('main', name) if config.has_option(*alias): if options.debug: print("Using option %s from config file" % (alias,)) if type == 'integer': setattr(defaults, name, config.getint(*alias)) elif type == 'boolean': setattr(defaults, name, config.getboolean(*alias)) else: setattr(defaults, name, config.get(*alias)) # parse again with defaults (options, args) = parser.parse_args(values=defaults) options.config = config # special handling for cert defaults cert_defaults = { 'cert': '/etc/koji-gc/client.crt', 'serverca': '/etc/koji-gc/serverca.crt', } for name in cert_defaults: if getattr(options, name, None) is None: fn = cert_defaults[name] if os.path.exists(fn): setattr(options, name, fn) return options, args def ensure_connection(session): try: ret = session.getAPIVersion() except koji.xmlrpcplus.xmlrpc_client.ProtocolError: error("Unable to connect to server") if ret != koji.API_VERSION: warn("The server is at API version %d and the client is at %d" % (ret, koji.API_VERSION)) def activate_session(session): """Test and login the session is applicable""" global options if options.noauth: # skip authentication pass elif options.cert is not None and os.path.isfile(options.cert): # authenticate using SSL client cert session.ssl_login(options.cert, None, options.serverca, proxyuser=options.runas) elif options.user: # authenticate using user/password session.login() elif options.keytab and options.principal: try: if options.keytab and options.principal: if not isinstance(options.keytab, str): raise koji.ParameterError('Invalid type of keytab: %s' % type(options.keytab)) if not isinstance(options.principal, str): raise koji.ParameterError('Invalid type of principal: %s' % type(options.principal)) session.gssapi_login( principal=options.principal, keytab=options.keytab, proxyuser=options.runas) else: session.gssapi_login(proxyuser=options.runas) except Exception as e: error("GSSAPI authentication failed: %s (%s)" % (e.args[1], e.args[0])) if not options.noauth and not session.logged_in: error("unable to log in, no authentication methods available") ensure_connection(session) if options.debug: print("successfully connected to hub") def get_all(): tags = session.listSideTags() sidetags = [] session.multicall = True for tag in tags: session.getTag(tag['id']) for tag in session.multiCall(): sidetags.append(tag[0]) return sidetags def delete_tags(tags): session.multicall = True for tag in tags: session.removeSideTag(tag['id']) session.multiCall() def clean_empty(tags): # delete empty tags which are older than --empty-delay if not options.clean_empty: return tags passed = [] candidates = [] deleted = [] session.multicall = True for tag in tags: session.listTagged(tag['id']) for tag, tagged in zip(tags, session.multiCall()): if len(tagged[0]) == 0: candidates.append(tag) else: passed.append(tag) # check age d = datetime.datetime.now() now_ts = d.timestamp() old_ts = (d - datetime.timedelta(options.empty_delay)).timestamp() session.multicall = True for tag in candidates: session.queryHistory(['tag_config'], tag=tag['id']) for tag, history in zip(candidates, session.multiCall()): create_ts = history[0]['tag_config'][0]['create_ts'] if create_ts < old_ts: diff = datetime.timedelta(seconds=now_ts - create_ts) print("[empty] %s (%s)" % (tag['name'], diff)) if not options.test: deleted.append(tag) else: passed.append(tag) delete_tags(deleted) return passed def clean_old(tags): # delete tags that are older that --old-delay if not options.clean_old: return tags passed = [] deleted = [] d = datetime.datetime.now() now_ts = d.timestamp() old_ts = (d - datetime.timedelta(options.old_delay)).timestamp() session.multicall = True for tag in tags: session.queryHistory(['tag_config'], tag=tag['id']) for tag, history in zip(tags, session.multiCall()): create_ts = history[0]['tag_config'][0]['create_ts'] if create_ts < old_ts: diff = datetime.timedelta(seconds=now_ts - create_ts) print(f"[old] {tag['name']} ({diff})") if not options.test: deleted.append(tag) else: passed.append(tag) delete_tags(deleted) return passed def clean_inactive(tags): """Check latest tagged build against inactive_delay""" if not options.clean_inactive: return tags passed = [] deleted = [] d = datetime.datetime.now() now_ts = d.timestamp() old_ts = (d - datetime.timedelta(options.inactive_delay)).timestamp() session.multicall = True for tag in tags: session.queryHistory(['tag_listing'], tag=tag['id']) for tag, history in zip(tags, session.multiCall()): create_ts = 0 for h in history[0]['tag_listing']: if h['create_ts'] > create_ts: create_ts = h['create_ts'] # if there was never tagged any build it should be subject to clean_empty policy # we will skip this case here (create_ts = 0) if create_ts and create_ts < old_ts: diff = datetime.timedelta(seconds=now_ts - create_ts) print(f"[inactive] {tag['name']} ({diff})") if not options.test: deleted.append(tag) else: passed.append(tag) delete_tags(deleted) return passed def main(args): activate_session(session) sidetags = get_all() sidetags = clean_empty(sidetags) sidetags = clean_old(sidetags) sidetags = clean_inactive(sidetags) if __name__ == "__main__": options, args = get_options() session_opts = koji.grab_session_options(options) session = koji.ClientSession(options.server, session_opts) main(args)