#!/usr/bin/python # Koji Repository Administrator (kojira) # Copyright (c) 2005-2007 Red Hat # # Koji is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; # version 2.1 of the License. # # This software is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this software; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA # # Authors: # Mike McLean try: import krbV except ImportError: pass import sys import os import koji from optparse import OptionParser from ConfigParser import ConfigParser import logging import logging.handlers import pprint import signal import time import traceback def safe_rmtree(path, strict=True): logger = logging.getLogger("koji.repo") #safe remove: with -xdev the find cmd will not cross filesystems # (though it will cross bind mounts from the same filesystem) if not os.path.exists(path): logger.debug("No such path: %s" % path) return #first rm -f non-directories logger.debug('Removing files under %s' % path) rv = os.system("find '%s' -xdev \\! -type d -print0 |xargs -0 rm -f" % path) msg = 'file removal failed (code %r) for %s' % (rv,path) if rv != 0: logger.warn(msg) if strict: raise koji.GenericError, msg else: return rv #them rmdir directories #with -depth, we start at the bottom and work up logger.debug('Removing directories under %s' % path) rv = os.system("find '%s' -xdev -depth -type d -print0 |xargs -0 rmdir" % path) msg = 'dir removal failed (code %r) for %s' % (rv,path) if rv != 0: logger.warn(msg) if strict: raise koji.GenericError, msg return rv class ManagedRepo(object): def __init__(self, data): self.logger = logging.getLogger("koji.repo") self.current = True self.repo_id = data['id'] self.event_id = data['create_event'] self.event_ts = data['create_ts'] self.tag_id = data['tag_id'] self.state = data['state'] order = session.getFullInheritance(self.tag_id, event=self.event_id) #order may contain same tag more than once tags = {self.tag_id : 1} for x in order: tags[x['parent_id']] = 1 self.taglist = tags.keys() def expire(self): """Mark the repo expired""" if self.state == koji.REPO_EXPIRED: return elif self.state == koji.REPO_DELETED: raise GenericError, "Repo already deleted" self.logger.info("Expiring repo %s.." % self.repo_id) session.repoExpire(self.repo_id) self.state = koji.REPO_EXPIRED def expired(self): return self.state == koji.REPO_EXPIRED def pending(self, timeout=180): """Determine if repo generation appears to be in progress and not already obsolete""" if self.state != koji.REPO_INIT: return False age = time.time() - self.event_ts return self.isCurrent(ignore_state=True) and age < timeout def stale(self): """Determine if repo seems stale By stale, we mean: - state=INIT - timestamp really, really old """ timeout = 36000 #XXX - config if self.state != koji.REPO_INIT: return False age = time.time() - self.event_ts return age > timeout def tryDelete(self): """Remove the repo from disk, if possible""" tag_info = session.getTag(self.tag_id) if not tag_info: self.logger.warn('Could not get info for tag %i, skipping delete of repo %i' % (self.tag_id, self.repo_id)) return False tag_name = tag_info['name'] path = pathinfo.repo(self.repo_id, tag_name) try: #also check dir age. We do this because a repo can be created from an older event #and should not be removed based solely on that event's timestamp. mtime = os.stat(path).st_mtime except OSError: self.logger.error("Can't stat repo directory: %s" % path) return True age = time.time() - max(self.event_ts, mtime) if age < options.deleted_repo_lifetime: #XXX should really be called expired_repo_lifetime return False self.logger.debug("Attempting to delete repo %s.." % self.repo_id) if self.state != koji.REPO_EXPIRED: raise GenericError, "Repo not expired" if session.repoDelete(self.repo_id) > 0: #cannot delete, we are referenced by a buildroot self.logger.debug("Cannot delete repo %s, still referenced" % self.repo_id) return False self.logger.info("Deleted repo %s" % self.repo_id) self.state = koji.REPO_DELETED safe_rmtree(path, strict=False) return True def ready(self): return self.state == koji.REPO_READY def deleted(self): return self.state == koji.REPO_DELETED def problem(self): return self.state == koji.REPO_PROBLEM def isCurrent(self, ignore_state=False): if not self.current: # no point in checking again return False if not ignore_state and self.state != koji.REPO_READY: #also no point in checking return False self.logger.debug("Checking for changes: %r" % self.taglist) if session.tagChangedSinceEvent(self.event_id,self.taglist): self.logger.debug("Tag data has changed since event %r" % self.event_id) self.current = False else: self.logger.debug("No tag changes since event %r" % self.event_id) return self.current class RepoManager(object): def __init__(self): self.repos = {} self.tasks = {} self.logger = logging.getLogger("koji.repo.manager") def printState(self): for repo in self.repos.itervalues(): self.logger.debug("repo %s: tag=%s, state=%s" % (repo.repo_id, repo.tag_id, koji.REPO_STATES[repo.state])) for tag_id, task_id in self.tasks.iteritems(): self.logger.debug("task %s for tag %s" % (task_id, tag_id)) def readCurrentRepos(self): self.logger.debug("Reading current repo data") repodata = session.getActiveRepos() self.logger.debug("Repo data: %r" % repodata) for data in repodata: repo_id = data['id'] repo = self.repos.get(repo_id) if repo: #we're already tracking it if repo.state != data['state']: self.logger.info('State changed for repo %s: %s -> %s' %(repo_id, koji.REPO_STATES[repo.state], koji.REPO_STATES[data['state']])) repo.state = data['state'] else: self.logger.info('Found repo %s, state=%s' %(repo_id, koji.REPO_STATES[data['state']])) self.repos[repo_id] = ManagedRepo(data) def pruneLocalRepos(self): """Scan filesystem for repos and remove any deleted ones Also, warn about any oddities""" self.logger.debug("Scanning filesystem for repos") topdir = "%s/repos" % pathinfo.topdir count = 0 for tag in os.listdir(topdir): tagdir = "%s/%s" % (topdir, tag) if not os.path.isdir(tagdir): continue taginfo = session.getTag(tag) if taginfo is None: self.logger.warn("Unexpected directory (no such tag): %s" % tagdir) continue for repo_id in os.listdir(tagdir): if count >= options.prune_batch_size: #this keeps us from spending too much time on this at one time return repodir = "%s/%s" % (tagdir, repo_id) if not os.path.isdir(repodir): continue try: repo_id = int(repo_id) except ValueError: continue if self.repos.has_key(repo_id): #we're already managing it, no need to deal with it here continue try: dir_ts = os.stat(repodir).st_mtime except OSError: #just in case something deletes the repo out from under us continue rinfo = session.repoInfo(repo_id) if rinfo is None: age = time.time() - dir_ts if age > 36000: if not options.ignore_stray_repos: self.logger.warn("Unexpected directory (no such repo): %s" % repodir) continue if rinfo['tag_name'] != taginfo['name']: self.logger.warn("Tag name mismatch: %s" % repodir) continue if rinfo['state'] in (koji.REPO_DELETED, koji.REPO_PROBLEM): age = time.time() - max(rinfo['create_ts'], dir_ts) if age > options.deleted_repo_lifetime: #XXX should really be called expired_repo_lifetime count += 1 logger.info("Removing stray repo (state=%s): %s" % (koji.REPO_STATES[rinfo['state']], repodir)) safe_rmtree(repodir, strict=False) pass def updateRepos(self): #check on tasks for tag_id, task_id in self.tasks.items(): tinfo = session.getTaskInfo(task_id) tstate = koji.TASK_STATES[tinfo['state']] if tstate == 'CLOSED': self.logger.info("Finished: newRepo task %s for tag %s" % (task_id, tag_id)) del self.tasks[tag_id] elif tstate in ('CANCELED', 'FAILED'): self.logger.info("Problem: newRepo task %s for tag %s is %s" % (task_id, tag_id, tstate)) del self.tasks[tag_id] #TODO [?] - implement a timeout for active tasks? self.logger.debug("Current tasks: %r" % self.tasks) if len(self.tasks) >= options.max_repo_tasks: self.logger.info("Maximum number of repo tasks reached.") return self.logger.debug("Updating repos") self.readCurrentRepos() #check for stale repos for repo in self.repos.values(): if repo.stale(): repo.expire() #find out which tags require repos tags = {} for target in session.getBuildTargets(): tag_id = target['build_tag'] tags[tag_id] = target['build_tag_name'] #index repos by tag tag_repos = {} for repo in self.repos.values(): tag_repos.setdefault(repo.tag_id, []).append(repo) self.logger.debug("Needed tags: %r" % tags.keys()) self.logger.debug("Current tags: %r" % tag_repos.keys()) #we need to determine: # - which tags need a new repo # - if any repos seem to be broken regen = [] for tag_id in tags.iterkeys(): covered = False for repo in tag_repos.get(tag_id,[]): if repo.isCurrent(): covered = True break elif repo.pending(): #one on the way covered = True break if covered: continue if self.tasks.has_key(tag_id): #repo creation in progress #TODO - implement a timeout continue #tag still appears to be uncovered #figure out how old existing repo is ts = 0 for repo in tag_repos.get(tag_id, []): if repo.event_ts > ts: ts = repo.event_ts regen.append((ts, tag_id)) regen.sort() self.logger.debug("order: %s", regen) # i.e. tags with oldest (or no) repos get precedence for ts, tag_id in regen: if len(self.tasks) >= options.max_repo_tasks: self.logger.info("Maximum number of repo tasks reached.") break task_id = session.newRepo(tags[tag_id]) self.logger.info("Created newRepo task %s for tag %s (%s)" % (task_id, tag_id, tags[tag_id])) self.tasks[tag_id] = task_id #some cleanup n_deletes = 0 for tag_id, repolist in tag_repos.items(): if not tags.has_key(tag_id): #repos for these tags are no longer required for repo in repolist: if repo.ready(): repo.expire() for repo in repolist: if n_deletes >= options.delete_batch_size: break if repo.expired(): #try to delete if repo.tryDelete(): n_deletes += 1 def main(): repomgr = RepoManager() repomgr.readCurrentRepos() repomgr.pruneLocalRepos() logger.info("Entering main loop") while True: try: repomgr.updateRepos() repomgr.printState() repomgr.pruneLocalRepos() except KeyboardInterrupt: logger.warn("User exit") break except koji.AuthExpired: logger.warn("Session expired") break except SystemExit: logger.warn("Shutting down") break except: # log the exception and continue logger.error(''.join(traceback.format_exception(*sys.exc_info()))) try: time.sleep(5) except KeyboardInterrupt: logger.warn("User exit") break try: session.logout() finally: sys.exit() def _exit_signal_handler(signum, frame): logger.error('Exiting on signal') session.logout() sys.exit(1) def get_options(): """process options from command line and config file""" # parse command line args parser = OptionParser("usage: %prog [opts]") parser.add_option("-c", "--config", dest="configFile", help="use alternate configuration file", metavar="FILE", default="/etc/kojira/kojira.conf") parser.add_option("--user", help="specify user") parser.add_option("--password", help="specify password") parser.add_option("--principal", help="Kerberos principal") parser.add_option("--keytab", help="Kerberos keytab") parser.add_option("-f", "--fg", dest="daemon", action="store_false", default=True, help="run in foreground") parser.add_option("-d", "--debug", action="store_true", help="show debug output") parser.add_option("-q", "--quiet", action="store_true", help="don't show warnings") parser.add_option("-v", "--verbose", action="store_true", help="show verbose output") parser.add_option("--with-src", action="store_true", help="include srpms in repos") parser.add_option("--force-lock", action="store_true", default=False, help="force lock for exclusive session") parser.add_option("--debug-xmlrpc", action="store_true", default=False, help="show xmlrpc debug output") parser.add_option("--skip-main", action="store_true", default=False, help="don't actually run main") parser.add_option("--show-config", action="store_true", default=False, help="Show config and exit") parser.add_option("-s", "--server", help="URL of XMLRPC server") parser.add_option("--topdir", help="Specify topdir") parser.add_option("--logfile", help="Specify logfile") (options, args) = parser.parse_args() config = ConfigParser() config.read(options.configFile) section = 'kojira' for x in config.sections(): if x != section: quit('invalid section found in config file: %s' % x) defaults = {'with_src': False, 'verbose': False, 'debug': False, 'ignore_stray_repos': False, 'topdir': '/mnt/koji', 'server': None, 'logfile': '/var/log/kojira.log', 'principal': None, 'keytab': None, 'retry_interval': 60, 'max_retries': 120, 'offline_retry': True, 'offline_retry_interval': 120, 'prune_batch_size': 4, 'delete_batch_size': 3, 'max_repo_tasks' : 10, 'deleted_repo_lifetime': 7*24*3600, #XXX should really be called expired_repo_lifetime 'cert': '/etc/kojira/client.crt', 'ca': '/etc/kojira/clientca.crt', 'serverca': '/etc/kojira/serverca.crt' } if config.has_section(section): int_opts = ('prune_batch_size', 'deleted_repo_lifetime', 'max_repo_tasks', 'delete_batch_size', 'retry_interval', 'max_retries', 'offline_retry_interval') str_opts = ('topdir','server','user','password','logfile', 'principal', 'keytab', 'cert', 'ca', 'serverca') bool_opts = ('with_src','verbose','debug','ignore_stray_repos', 'offline_retry') for name in config.options(section): if name in int_opts: defaults[name] = config.getint(section, name) elif name in str_opts: defaults[name] = config.get(section, name) elif name in bool_opts: defaults[name] = config.getboolean(section, name) else: quit("unknown config option: %s" % name) for name, value in defaults.items(): if getattr(options, name, None) is None: setattr(options, name, value) if options.logfile in ('','None','none'): options.logfile = None return options def quit(msg=None, code=1): if msg: logging.getLogger("koji.repo").error(msg) sys.stderr.write('%s\n' % msg) sys.stderr.flush() sys.exit(code) if __name__ == "__main__": options = get_options() topdir = getattr(options,'topdir',None) pathinfo = koji.PathInfo(topdir) if options.show_config: pprint.pprint(options.__dict__) sys.exit() if options.logfile: if not os.path.exists(options.logfile): try: logfile = open(options.logfile, "w") logfile.close() except: sys.stderr.write("Cannot create logfile: %s\n" % options.logfile) sys.exit(1) if not os.access(options.logfile,os.W_OK): sys.stderr.write("Cannot write to logfile: %s\n" % options.logfile) sys.exit(1) koji.add_file_logger("koji", options.logfile) #note we're setting logging for koji.* logger = logging.getLogger("koji") if options.debug: logger.setLevel(logging.DEBUG) elif options.verbose: logger.setLevel(logging.INFO) elif options.quiet: logger.setLevel(logging.ERROR) else: logger.setLevel(logging.WARNING) session_opts = {} for k in ('user', 'password', 'debug_xmlrpc', 'debug', 'retry_interval', 'max_retries', 'offline_retry', 'offline_retry_interval'): session_opts[k] = getattr(options,k) session = koji.ClientSession(options.server,session_opts) if os.path.isfile(options.cert): # authenticate using SSL client certificates session.ssl_login(options.cert, options.ca, options.serverca) elif options.user: # authenticate using user/password session.login() elif sys.modules.has_key('krbV') and options.principal and options.keytab: session.krb_login(options.principal, options.keytab) #get an exclusive session try: session.exclusiveSession(force=options.force_lock) except koji.AuthLockError: quit("Error: Unable to get lock. Trying using --force-lock") if not session.logged_in: quit("Error: Unknown login error") if not session.logged_in: print "Error: unable to log in" sys.exit(1) if options.skip_main: sys.exit() elif options.daemon: koji.daemonize() else: koji.add_stderr_logger("koji") main()