diff --git a/util/koji-shadow b/util/koji-shadow new file mode 100755 index 00000000..e64d45b6 --- /dev/null +++ b/util/koji-shadow @@ -0,0 +1,799 @@ +#!/usr/bin/python + +# koji-shadow: a tool to shadow builds between koji instances +# Copyright (c) 2007-2008 Red Hat +# +# Koji is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; +# version 2.1 of the License. +# +# This software is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this software; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +# +# Authors: +# Mike McLean + +try: + import krbV +except ImportError: + pass +import koji +import ConfigParser +from email.MIMEText import MIMEText +import fnmatch +import optparse +import os +import pprint +import smtplib +import socket # for socket.error and socket.setdefaulttimeout +import sys +import time +import xmlrpclib # for ProtocolError and Fault + +# koji.fp.o keeps stalling, probably network errors... +# better to time out than to stall +socket.setdefaulttimeout(180) #XXX - too short? + + +OptionParser = optparse.OptionParser +if optparse.__version__ == "1.4.1+": + def _op_error(self, msg): + self.print_usage(sys.stderr) + msg = "%s: error: %s\n" % (self._get_prog_name(), msg) + if msg: + sys.stderr.write(msg) + sys.exit(2) + OptionParser.error = _op_error + + +def _(args): + """Stub function for translation""" + return args + +def get_options(): + """process options from command line and config file""" + + usage = _("%prog [options]") + parser = OptionParser(usage=usage) + parser.add_option("-c", "--config-file", metavar="FILE", + help=_("use alternate configuration file")) + parser.add_option("--keytab", help=_("specify a Kerberos keytab to use")) + parser.add_option("--principal", help=_("specify a Kerberos principal to use")) + parser.add_option("--runas", metavar="USER", + help=_("run as the specified user (requires special privileges)")) + parser.add_option("--user", help=_("specify user")) + parser.add_option("--password", help=_("specify password")) + parser.add_option("--noauth", action="store_true", default=False, + help=_("do not authenticate")) + parser.add_option("-n", "--test", action="store_true", default=False, + help=_("test mode")) + parser.add_option("-d", "--debug", action="store_true", default=False, + help=_("show debug output")) + parser.add_option("--first-one", action="store_true", default=False, + help=_("stop after scanning first build -- debugging")) + parser.add_option("--debug-xmlrpc", action="store_true", default=False, + help=_("show xmlrpc debug output")) + parser.add_option("--skip-main", action="store_true", default=False, + help=_("don't actually run main")) +# parser.add_option("--tag-filter", metavar="PATTERN", +# help=_("limit tags for pruning")) +# parser.add_option("--pkg-filter", metavar="PATTERN", +# help=_("limit packages for pruning")) + parser.add_option("--build", + help=_("scan just this build")) + parser.add_option("-s", "--server", + help=_("url of local XMLRPC server")) + parser.add_option("-r", "--remote", + help=_("url of remote XMLRPC server")) + #parse once to get the config file + (options, args) = parser.parse_args() + + defaults = parser.get_default_values() + config = ConfigParser.ConfigParser() + cf = getattr(options, 'config_file', None) + if cf: + if not os.access(cf, os.F_OK): + parser.error(_("No such file: %s") % cf) + assert False + else: + cf = '/etc/koji-shadow.conf' + if not os.access(cf, os.F_OK): + cf = None + if not cf: + print "no config file" + config = None + else: + config.read(cf) + #allow config file to update defaults for certain options + cfgmap = [ + ['keytab', None, 'string'], + ['principal', None, 'string'], + ['runas', None, 'string'], + ['user', None, 'string'], + ['password', None, 'string'], + ['noauth', None, 'boolean'], + ['server', None, 'string'], + ['remote', None, 'string'], + ] + for name, alias, type in cfgmap: + if alias is None: + alias = ('global', name) + if config.has_option(*alias): + print "Using option %s from config file" % (alias,) + if type == 'integer': + setattr(defaults, name, config.getint(*alias)) + elif type == 'boolean': + setattr(defaults, name, config.getboolean(*alias)) + else: + setattr(defaults, name, config.get(*alias)) + #parse again with updated defaults + (options, args) = parser.parse_args(values=defaults) + options.config = config + + return options, args + +time_units = { + 'second' : 1, + 'minute' : 60, + 'hour' : 3600, + 'day' : 86400, + 'week' : 604800, +} +time_unit_aliases = [ + #[unit, alias, alias, ...] + ['week', 'weeks', 'wk', 'wks'], + ['hour', 'hours', 'hr', 'hrs'], + ['day', 'days'], + ['minute', 'minutes', 'min', 'mins'], + ['second', 'seconds', 'sec', 'secs', 's'], +] +def parse_duration(str): + """Parse time duration from string, returns duration in seconds""" + ret = 0 + n = None + unit = None + def parse_num(s): + try: + return int(s) + except ValueError: + pass + try: + return float(s) + except ValueError: + pass + return None + for x in str.split(): + if n is None: + n = parse_num(x) + if n is not None: + continue + #perhaps the unit is appended w/o a space + for names in time_unit_aliases: + for name in names: + if x.endswith(name): + n = parse_num(x[:-len(name)]) + if n is None: + continue + unit = names[0] + # combined at end + break + if unit: + break + else: + raise ValueError, "Invalid time interval: %s" % str + if unit is None: + x = x.lower() + for names in time_unit_aliases: + for name in names: + if x == name: + unit = names[0] + break + if unit: + break + else: + raise ValueError, "Invalid time interval: %s" % str + ret += n * time_units[unit] + n = None + unit = None + return ret + +def error(msg=None, code=1): + if msg: + sys.stderr.write(msg + "\n") + sys.stderr.flush() + sys.exit(code) + +def warn(msg): + sys.stderr.write(msg + "\n") + sys.stderr.flush() + +def ensure_connection(session): + try: + ret = session.getAPIVersion() + except xmlrpclib.ProtocolError: + error(_("Error: Unable to connect to server")) + if ret != koji.API_VERSION: + warn(_("WARNING: The server is at API version %d and the client is at %d" % (ret, koji.API_VERSION))) + +def activate_session(session): + """Test and login the session is applicable""" + global options + if options.noauth: + #skip authentication + pass + elif options.user: + #authenticate using user/password + session.login() + elif sys.modules.has_key('krbV'): + try: + if options.keytab and options.principal: + session.krb_login(principal=options.principal, keytab=options.keytab, proxyuser=options.runas) + else: + session.krb_login(proxyuser=options.runas) + except krbV.Krb5Error, e: + error(_("Kerberos authentication failed: '%s' (%s)") % (e.message, e.err_code)) + except socket.error, e: + warn(_("Could not connect to Kerberos authentication service: '%s'") % e.args[1]) + if not options.noauth and not session.logged_in: + error(_("Error: unable to log in")) + ensure_connection(session) + if options.debug: + print "successfully connected to hub" + + +def main(args): + #activate_session(session) + bar() + + +def remote_buildroots(build_id): + """Return a list of buildroots for remote build""" + #XXX - only used in old test code (foo) + rpms = remote.listRPMs(build_id) + brs = {} + for rinfo in rpms: + br_id = rinfo.get('buildroot_id') + if not br_id: + print "Warning: no buildroot for: %s" % rinfo + continue + brs[br_id] = 1 + return brs.keys() + +def remote_br_builds(brlist): + """Given a list of buildroots, return build data of contents""" + #XXX - only used in old test code (foo) + seen = {} + builds = {} + for br_id in brlist: + if seen.has_key(br_id): + continue + seen[br_id] = 1 + #print "." + for rinfo in remote.listRPMs(componentBuildrootID=br_id): + builds[rinfo['build_id']] = 1 + return dict([(b, remote.getBuild(b)) for b in builds]) + +def foo(): + """just experimenting....""" + binfo = remote.getBuild(args[0]) + buildroots = remote_buildroots(binfo['id']) + if not buildroots: + #nothing we can do + return + build_idx = remote_br_builds(buildroots) + name_idx = {} + for binfo2 in build_idx.itervalues(): + name_idx.setdefault(binfo2['name'], []).append(binfo2) + names = name_idx.keys() + missing = {} + found = {} + for name, builds in name_idx.iteritems(): + if len(builds) > 1: + print "Warning: found multiple versions of %s: %s" % (name, builds) + #pick latest (by completion time) + order = [(b['completion_ts'], b) for b in builds] + order.sort() + build = order[-1][1] + else: + build = builds[0] + nvr = "%(name)s-%(version)s-%(release)s" % build + build.setdefault('nvr', nvr) + #see if our server has it + ours = session.getBuild(nvr) + if ours: + ours.setdefault('nvr', nvr) + found[name] = ours + else: + missing[name] = build + names = found.keys() + names.sort() + for name in names: + print "Found common build: %(nvr)s" % found[name] + names = missing.keys() + names.sort() + for name in names: + print "Missing remote build: %(nvr)s" % missing[name] + + +class TrackedBuild(object): + + def __init__(self, build_id, child=None, tracker=None): + self.id = build_id + self.tracker = tracker + self.info = remote.getBuild(build_id) + self.nvr = "%(name)s-%(version)s-%(release)s" % self.info + self.children = {} + self.state = None + self.order = 0 + if child is not None: + #children tracks the builds that were built using this one + self.children[child] = 1 + #see if we have it + ours = session.getBuild(self.nvr) + self.rebuilt = False + if ours is not None: + state = koji.BUILD_STATES[ours['state']] + if state == 'COMPLETE': + self.setState("common") + if ours['task_id']: + self.rebuilt = True + return + elif state in ('FAILED', 'CANCELED'): + #treat these as having no build + pass + else: + # DELETED, BUILDING + self.setState("broken") + return + self.setState("missing") + self.getDeps() #sets deps, br_tag, base, order, (maybe state) + + def setState(self, state): + #print "%s -> %s" % (self.nvr, state) + if state == self.state: + return + if self.state is not None and self.tracker: + del self.tracker.state_idx[self.state][self.id] + self.state = state + if self.tracker: + self.tracker.state_idx.setdefault(self.state, {})[self.id] = 1 + + def addChild(self, child): + self.children[child] = 1 + + def setExtraArchesFromRPMs(self, rpms=None): + if rpms is None: + rpms = remote.listRPMs(self.id) + arches = {} + for rpminfo in rpms: + arches.setdefault(rpminfo['arch'], 1) + self.extraArches = [a for a in arches if koji.canonArch(a) != a] + + def getBuildroots(self): + """Return a list of buildroots for remote build""" + rpms = remote.listRPMs(self.id) + #while we've got the rpm list, let's note the extra arches + #XXX - really should reorganize this a bit + self.setExtraArchesFromRPMs(rpms) + brs = {} + bad = [] + for rinfo in rpms: + br_id = rinfo.get('buildroot_id') + if not br_id: + bad.append(rinfo) + continue + brs[br_id] = 1 + if brs and bad: + print "Warning: some rpms for %s lacked buildroots:" % self.nvr + for rinfo in bad: + print " %(name)-%(version)-%(release).%(arch)" % rinfo + return brs.keys() + + def getDeps(self): + buildroots = self.getBuildroots() + if not buildroots: + self.setState("noroot") + return + buildroots.sort() + self.order = buildroots[-1] + seen = {} #used to avoid scanning the same buildroot twice + builds = {} #track which builds we need for a rebuild + bases = {} #track base install for buildroots + tags = {} #track buildroot tag(s) + for br_id in buildroots: + if seen.has_key(br_id): + continue + seen[br_id] = 1 + br_info = remote.getBuildroot(br_id, strict=True) + tags.setdefault(br_info['tag_name'], 0) + tags[br_info['tag_name']] += 1 + #print "." + for rinfo in remote.listRPMs(componentBuildrootID=br_id): + builds[rinfo['build_id']] = 1 + if not rinfo['is_update']: + bases.setdefault(rinfo['name'], {})[br_id] = 1 + # we want to record the intersection of the base sets + # XXX - this makes some assumptions about homogeneity that, while reasonable, + # are not strictly required of the db. + # The only way I can think of to break this is if some significant tag/target + # changes happened during the build startup and some subtasks got the old + # repo and others the new one. + base = [] + for name, brlist in bases.iteritems(): + for br_id in buildroots: + if br_id not in brlist: + break + else: + #each buildroot had this as a base package + base.append(name) + if len(tags) > 1: + print "Warning: found multiple buildroot tags for %s: %s" % (self.nvr, tags.keys()) + counts = [(n, tag) for tag, n in tags.iteritems()] + sort(counts) + tag = counts[-1][1] + else: + tag = tags.keys()[0] + self.deps = builds + self.br_tag = tag + self.base = base + + +class BuildTracker(object): + + builds = {} + state_idx = {} + + def scanBuild(self, build_id, from_build=None, depth=0): + """Recursively scan a build and its dependencies""" + #print build_id + build = self.builds.get(build_id) + if build: + #already scanned + if from_build: + build.addChild(from_build.id) + return build + #otherwise... + child_id = None + if from_build: + child_id = from_build.id + build = TrackedBuild(build_id, child=child_id, tracker=self) + #print build.id, build.nvr + self.builds[build_id] = build + if len(self.builds) % 50 == 0: + self.report() + if from_build: + tail = " (from %s)" % from_build.nvr + else: + tail = "" + head = " " * depth + if build.state == "common": + #we're good + if build.rebuilt: + print "%sCommon build (rebuilt) %s%s" % (head, build.nvr, tail) + else: + print "%sCommon build %s%s" % (head, build.nvr, tail) + elif build.state == "noroot": + #we're fucked + print "%sWarning: no buildroot data for %s%s" % (head, build.nvr, tail) + elif build.state == "broken": + #also fucked + print "%sWarning: build exists, but is invalid: %s%s" % (head, build.nvr, tail) + elif build.state == "missing": + #scan its deps + print "%sMissing build %s%s. Scanning deps..." % (head, build.nvr, tail) + for dep_id in build.deps: + for retry in xrange(10): + try: + self.scanBuild(dep_id, from_build=build, depth=depth+1) + except (socket.timeout, socket.error): + print "retry" + continue + break + else: + print "Error: unable to scan dep: %i for %s" % (dep_id, build.nvr) + continue + return build + + def scanTag(self, tag): + """Scan the latest builds in a remote tag""" + taginfo = remote.getTag(tag) + builds = remote.listTagged(taginfo['id'], latest=True) + for build in builds: + for retry in xrange(10): + try: + self.scanBuild(build['id']) + if options.first_one: + return + except (socket.timeout, socket.error): + print "retry" + continue + break + else: + print "Error: unable to scan %(name)s-%(version)s-%(release)s" % build + continue + + def scan(self): + """Scan based on config file""" + to_scan = [] + alltags = remote.listTags() + + def rebuild(self, build): + """Rebuild a remote build using closest possible buildroot""" + #first check that we can + deps = [] + for build_id in build.deps: + dep = self.builds.get(build_id) + if not dep: + print "Missing dependency %i for %s. Not scanned?" % (build_id, build.nvr) + return + if dep.state != 'common': + print "Dependency missing for %s: %s (%s)" % (build.nvr, dep.nvr, dep.state) + return + deps.append(dep) + #check/create tag + our_tag = "SHADOWBUILD-%s" % build.br_tag + taginfo = session.getTag(our_tag) + parents = None + if not taginfo: + #XXX - not sure what is best here + #how do we pick arches? for now just assume all.... + #XXX this call for perms is stupid, but it's all we've got + perm_id = None + for data in session.getAllPerms(): + if data['name'] == 'admin': + perm_id = data['id'] + break + session.createTag(our_tag, perm=perm_id, arches='i386 ia64 ppc ppc64 s390 s390x x86_64') + taginfo = session.getTag(our_tag, strict=True) + session.createBuildTarget(taginfo['name'], taginfo['id'], taginfo['id']) + else: + parents = session.getInheritanceData(taginfo['id']) + if parents: + print "Warning: shadow build tag has inheritance" + #check package list + pkgs = {} + for pkg in session.listPackages(tagID=taginfo['id']): + pkgs[pkg['package_name']] = pkg + missing_pkgs = [] + for dep in deps: + name = dep.info['name'] + if not pkgs.has_key(name): + #guess owner + owners = {} + for pkg in session.listPackages(pkgID=name): + owners.setdefault(pkg['owner_id'], []).append(pkg) + if owners: + order = [(len(v), k) for k, v in owners.iteritems()] + order.sort() + owner = order[-1][1] + else: + #just use ourselves + owner=session.getLoggedInUser()['id'] + missing_pkgs.append((name, owner)) + #check build list + cur_builds = {} + for binfo in session.listTagged(taginfo['id']): + #index by name in tagging order (latest first) + cur_builds.setdefault(binfo['name'], []).append(binfo) + to_untag = [] + to_tag = [] + for dep in deps: + #XXX - assuming here that there is only one dep per 'name' + # may want to check that this is true + cur_order = cur_builds.get(dep.info['name'], []) + tagged = False + for binfo in cur_order: + if binfo['nvr'] == dep.nvr: + tagged = True + #may not be latest now, but it will be after we do all the untagging + else: + # note that the untagging keeps older builds from piling up. In a sense + # we're gc-pruning this tag ourselves every pass. + to_untag.append(binfo) + if not tagged: + to_tag.append(dep) + drop_groups = [] + build_group = None + for group in session.getTagGroups(taginfo['id']): + if group['name'] == 'build': + build_group = group + else: + # we should have no other groups but build + print "Warning: found stray group: %s" % group + drop_groups.append(group['name']) + if build_group: + #TODO - fix build group package list based on base of build to shadow + needed = dict([(n,1) for n in build.base]) + current = dict([(p['package'],1) for p in build_group['packagelist']]) + add_pkgs = [n for n in needed if not current.has_key(n)] + drop_pkgs = [n for n in current if not needed.has_key(n)] + #no group deps needed/allowed + drop_deps = [(g['name'], 1) for g in build_group['grouplist']] + if drop_deps: + print "Warning: build group had deps: %r" % build_group + else: + add_pkgs = build.base + drop_pkgs = [] + drop_deps = [] + #update package list, tagged packages, and groups in one multicall/transaction + #(avoid useless repo regens) + session.multicall = True + for name, owner in missing_pkgs: + session.packageListAdd(taginfo['id'], name, owner=owner) + for binfo in to_untag: + session.untagBuildBypass(taginfo['id'], binfo['id']) + for dep in to_tag: + session.tagBuildBypass(taginfo['id'], dep.nvr) + #shouldn't need force here + #set groups data + if not build_group: + # build group not present. add it + session.groupListAdd(taginfo['id'], 'build', force=True) + #using force in case group is blocked. This shouldn't be the case, but... + for pkg_name in drop_pkgs: + #in principal, our tag should not have inheritance, so the remove call is the right thing + session.groupPackageListRemove(taginfo['id'], 'build', pkg_name) + for pkg_name in add_pkgs: + session.groupPackageListAdd(taginfo['id'], 'build', pkg_name) + #we never add any blocks, so forcing shouldn't be required + #TODO - adjust extra_arches for package to build + #TODO - get event id to facilitate waiting on repo + # not sure if getLastEvent is good enough + # short of adding a new call, perhaps use getLastEvent together with event of + # current latest repo for tag + session.getLastEvent() + results = session.multiCall() + [event_id, event_ts] = results[-1] + #TODO - verify / check results ? + #TODO - call newRepo + #TODO - upload src + src = "" #XXX + #TODO - wait for repo + #TODO - kick off build + #task_id = session.build(src, taginfo['name'], ... ) #XXX + #TODO - add task/build to some sort of watch list + #TODO - post-build validation + + def report(self): + print time.asctime() + print "%i builds" % len(self.builds) + states = self.state_idx.keys() + states.sort() + for s in states: + print "%s: %i" % (s, len(self.state_idx[s])) + + def runRebuilds(self): + """Rebuild missing builds""" + print "Determining rebuild order" + builds = [(b.order, b.id, b) for b in self.builds.itervalues()] + builds.sort() + b_avail = {} + ok = 0 + bad = 0 + for order, build_id, build in builds: + if build.state == 'common': + b_avail[build_id] = 1 + elif build.state == 'missing': + #check deps + not_avail = [x for x in build.deps.iterkeys() if not b_avail.get(x)] + if not_avail: + print "Can't rebuild %s, missing %i deps" % (build.nvr, len(not_avail)) + b_avail[build_id] = 0 + bad += 1 + for dep_id in not_avail: + dep = self.builds[dep_id] + avail = b_avail.get(dep_id) + if avail is None: + print " %s (out of order?)" % dep.nvr + elif not avail: + print " %s (%s)" % (dep.nvr, dep.state) + else: + ok += 1 + print "rebuild: %s" % build.nvr + self.rebuild(build) + break #XXX + b_avail[build_id] = 1 + else: + print "build: %s, state: %s, #children: %i" \ + % (build.nvr, build.state, len(build.children)) + b_avail[build_id] = 0 + print "ok: %i, bad: %i" % (ok, bad) + + def showOrder(self): + """Show order of rebuilds (for debugging) + + This is sort of a dress rehearsal for the rebuild scheduler + """ + print "Determining rebuild order" + builds = [(b.order, b.id, b) for b in self.builds.itervalues()] + #builds = self.builds.items() # (id, build) + builds.sort() + b_avail = {} + ok = 0 + bad = 0 + #for build_id, build in builds: + for order, build_id, build in builds: + if build.state == 'common': + b_avail[build_id] = 1 + elif build.state == 'missing': + #for sanity, check deps + for dep_id in build.deps.iterkeys(): + dep = self.builds[dep_id] + avail = b_avail.get(dep_id) + if avail is None: + print "Can't rebuild %s, missing %s (out of order?)" % (build.nvr, dep.nvr) + b_avail[build_id] = 0 + bad += 1 + break + elif not avail: + print "Can't rebuild %s, missing %s (%s)" % (build.nvr, dep.nvr, dep.state) + b_avail[build_id] = 0 + bad += 1 + break + else: + ok += 1 + print "rebuild: %s" % build.nvr + b_avail[build_id] = 1 + else: + print "build: %s, state: %s, #children: %i" \ + % (build.nvr, build.state, len(build.children)) + #show_children(build_id) + b_avail[build_id] = 0 + print "ok: %i, bad: %i" % (ok, bad) + +def bar(): + tracker = BuildTracker() + #binfo = remote.getBuild(args[0], strict=True) + #tracker.scanBuild(binfo['id']) + if options.build: + binfo = remote.getBuild(options.build, strict=True) + tracker.scanBuild(binfo['id']) + else: + tracker.scanTag(args[0]) + tracker.report() + tracker.showOrder() + tracker.runRebuilds() + + +if __name__ == "__main__": + + options, args = get_options() + + session_opts = {} + for k in ('user', 'password', 'debug_xmlrpc', 'debug'): + session_opts[k] = getattr(options,k) + session = koji.ClientSession(options.server, session_opts) + if not options.noauth: + session.login() + #XXX - sane auth + #XXX - config! + remote = koji.ClientSession(options.remote, session_opts) + rv = 0 + try: + rv = main(args) + if not rv: + rv = 0 + except KeyboardInterrupt: + pass + except SystemExit: + rv = 1 + #except: + # if options.debug: + # raise + # else: + # exctype, value = sys.exc_info()[:2] + # rv = 1 + # print "%s: %s" % (exctype, value) + try: + session.logout() + except: + pass + sys.exit(rv) +