debian-koji/util/kojira
Mike McLean 54f79ff665 make ClientSession retries more configurable, and more robust
add an offline mode to the hub (ServerOffline fault)
report offline status if db connection fails
adjust retry timings for kojid and kojira
2008-02-22 11:32:27 -05:00

522 lines
20 KiB
Python
Executable file

#!/usr/bin/python
# Koji Repository Administrator (kojira)
# Copyright (c) 2005-2007 Red Hat
#
# Koji is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation;
# version 2.1 of the License.
#
# This software is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this software; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
#
# Authors:
# Mike McLean <mikem@redhat.com>
try:
import krbV
except ImportError:
pass
import sys
import os
import koji
from optparse import OptionParser
from ConfigParser import ConfigParser
import logging
import logging.handlers
import pprint
import signal
import time
import traceback
def safe_rmtree(path, strict=True):
logger = logging.getLogger("koji.repo")
#safe remove: with -xdev the find cmd will not cross filesystems
# (though it will cross bind mounts from the same filesystem)
if not os.path.exists(path):
logger.debug("No such path: %s" % path)
return
#first rm -f non-directories
logger.debug('Removing files under %s' % path)
rv = os.system("find '%s' -xdev \\! -type d -print0 |xargs -0 rm -f" % path)
msg = 'file removal failed (code %r) for %s' % (rv,path)
if rv != 0:
logger.warn(msg)
if strict:
raise koji.GenericError, msg
else:
return rv
#them rmdir directories
#with -depth, we start at the bottom and work up
logger.debug('Removing directories under %s' % path)
rv = os.system("find '%s' -xdev -depth -type d -print0 |xargs -0 rmdir" % path)
msg = 'dir removal failed (code %r) for %s' % (rv,path)
if rv != 0:
logger.warn(msg)
if strict:
raise koji.GenericError, msg
return rv
class ManagedRepo(object):
def __init__(self, data):
self.logger = logging.getLogger("koji.repo")
self.current = True
self.repo_id = data['id']
self.event_id = data['create_event']
self.event_ts = data['create_ts']
self.tag_id = data['tag_id']
self.state = data['state']
order = session.getFullInheritance(self.tag_id, event=self.event_id)
#order may contain same tag more than once
tags = {self.tag_id : 1}
for x in order:
tags[x['parent_id']] = 1
self.taglist = tags.keys()
def expire(self):
"""Mark the repo expired"""
if self.state == koji.REPO_EXPIRED:
return
elif self.state == koji.REPO_DELETED:
raise GenericError, "Repo already deleted"
self.logger.info("Expiring repo %s.." % self.repo_id)
session.repoExpire(self.repo_id)
self.state = koji.REPO_EXPIRED
def expired(self):
return self.state == koji.REPO_EXPIRED
def pending(self, timeout=180):
"""Determine if repo generation appears to be in progress and not already obsolete"""
if self.state != koji.REPO_INIT:
return False
age = time.time() - self.event_ts
return self.isCurrent(ignore_state=True) and age < timeout
def stale(self):
"""Determine if repo seems stale
By stale, we mean:
- state=INIT
- timestamp really, really old
"""
timeout = 36000
#XXX - config
if self.state != koji.REPO_INIT:
return False
age = time.time() - self.event_ts
return age > timeout
def tryDelete(self):
"""Remove the repo from disk, if possible"""
age = time.time() - self.event_ts
if age < options.deleted_repo_lifetime:
return False
self.logger.debug("Attempting to delete repo %s.." % self.repo_id)
if self.state != koji.REPO_EXPIRED:
raise GenericError, "Repo not expired"
if session.repoDelete(self.repo_id) > 0:
#cannot delete, we are referenced by a buildroot
self.logger.debug("Cannot delete repo %s, still referenced" % self.repo_id)
return False
self.logger.info("Deleted repo %s" % self.repo_id)
self.state = koji.REPO_DELETED
tag_info = session.getTag(self.tag_id)
if not tag_info:
self.logger.warn('Could not get info for tag %i, skipping delete of repo %i' %
(self.tag_id, self.repo_id))
return False
tag_name = tag_info['name']
path = pathinfo.repo(self.repo_id, tag_name)
safe_rmtree(path, strict=False)
return True
def ready(self):
return self.state == koji.REPO_READY
def deleted(self):
return self.state == koji.REPO_DELETED
def problem(self):
return self.state == koji.REPO_PROBLEM
def isCurrent(self, ignore_state=False):
if not self.current:
# no point in checking again
return False
if not ignore_state and self.state != koji.REPO_READY:
#also no point in checking
return False
self.logger.debug("Checking for changes: %r" % self.taglist)
if session.tagChangedSinceEvent(self.event_id,self.taglist):
self.logger.debug("Tag data has changed since event %r" % self.event_id)
self.current = False
else:
self.logger.debug("No tag changes since event %r" % self.event_id)
return self.current
class RepoManager(object):
def __init__(self):
self.repos = {}
self.tasks = {}
self.logger = logging.getLogger("koji.repo.manager")
def printState(self):
for repo in self.repos.itervalues():
self.logger.debug("repo %s: tag=%s, state=%s"
% (repo.repo_id, repo.tag_id, koji.REPO_STATES[repo.state]))
for tag_id, task_id in self.tasks.iteritems():
self.logger.debug("task %s for tag %s" % (task_id, tag_id))
def readCurrentRepos(self):
self.logger.debug("Reading current repo data")
repodata = session.getActiveRepos()
self.logger.debug("Repo data: %r" % repodata)
for data in repodata:
repo_id = data['id']
repo = self.repos.get(repo_id)
if repo:
#we're already tracking it
if repo.state != data['state']:
self.logger.info('State changed for repo %s: %s -> %s'
%(repo_id, koji.REPO_STATES[repo.state], koji.REPO_STATES[data['state']]))
repo.state = data['state']
else:
self.logger.info('Found repo %s, state=%s'
%(repo_id, koji.REPO_STATES[data['state']]))
self.repos[repo_id] = ManagedRepo(data)
def pruneLocalRepos(self):
"""Scan filesystem for repos and remove any deleted ones
Also, warn about any oddities"""
self.logger.debug("Scanning filesystem for repos")
topdir = "%s/repos" % pathinfo.topdir
count = 0
for tag in os.listdir(topdir):
tagdir = "%s/%s" % (topdir, tag)
if not os.path.isdir(tagdir):
continue
taginfo = session.getTag(tag)
if taginfo is None:
self.logger.warn("Unexpected directory (no such tag): %s" % tagdir)
continue
for repo_id in os.listdir(tagdir):
if count >= options.prune_batch_size:
#this keeps us from spending too much time on this at one time
return
repodir = "%s/%s" % (tagdir, repo_id)
if not os.path.isdir(repodir):
continue
try:
repo_id = int(repo_id)
except ValueError:
continue
if self.repos.has_key(repo_id):
#we're already managing it, no need to deal with it here
continue
rinfo = session.repoInfo(repo_id)
if rinfo is None:
try:
age = time.time() - os.stat(repodir).st_mtime
except OSError:
#just in case something deletes the repo out from under us
continue
if age > 36000:
self.logger.warn("Unexpected directory (no such repo): %s" % repodir)
continue
if rinfo['tag_name'] != taginfo['name']:
self.logger.warn("Tag name mismatch: %s" % repodir)
continue
if rinfo['state'] in (koji.REPO_DELETED, koji.REPO_PROBLEM):
age = time.time() - rinfo['create_ts']
if age > options.deleted_repo_lifetime:
count += 1
logger.info("Removing stray repo (state=%s): %s" % (koji.REPO_STATES[rinfo['state']], repodir))
safe_rmtree(repodir, strict=False)
pass
def updateRepos(self):
#check on tasks
for tag_id, task_id in self.tasks.items():
tinfo = session.getTaskInfo(task_id)
tstate = koji.TASK_STATES[tinfo['state']]
if tstate == 'CLOSED':
self.logger.info("Finished: newRepo task %s for tag %s" % (task_id, tag_id))
del self.tasks[tag_id]
elif tstate in ('CANCELED', 'FAILED'):
self.logger.info("Problem: newRepo task %s for tag %s is %s" % (task_id, tag_id, tstate))
del self.tasks[tag_id]
#TODO [?] - implement a timeout for active tasks?
self.logger.debug("Current tasks: %r" % self.tasks)
if len(self.tasks) >= options.max_repo_tasks:
self.logger.info("Maximum number of repo tasks reached.")
return
self.logger.debug("Updating repos")
self.readCurrentRepos()
#check for stale repos
for repo in self.repos.values():
if repo.stale():
repo.expire()
#find out which tags require repos
tags = {}
for target in session.getBuildTargets():
tag_id = target['build_tag']
tags[tag_id] = 1
#index repos by tag
tag_repos = {}
for repo in self.repos.values():
tag_repos.setdefault(repo.tag_id, []).append(repo)
self.logger.debug("Needed tags: %r" % tags.keys())
self.logger.debug("Current tags: %r" % tag_repos.keys())
#we need to determine:
# - which tags need a new repo
# - if any repos seem to be broken
regen = []
for tag_id in tags.iterkeys():
covered = False
for repo in tag_repos.get(tag_id,[]):
if repo.isCurrent():
covered = True
break
elif repo.pending():
#one on the way
covered = True
break
if covered:
continue
if self.tasks.has_key(tag_id):
#repo creation in progress
#TODO - implement a timeout
continue
#tag still appears to be uncovered
#figure out how old existing repo is
ts = 0
for repo in tag_repos.get(tag_id, []):
if repo.event_ts > ts:
ts = repo.event_ts
regen.append((ts, tag_id))
regen.sort()
self.logger.debug("order: %s", regen)
# i.e. tags with oldest (or no) repos get precedence
for ts, tag_id in regen:
if len(self.tasks) >= options.max_repo_tasks:
self.logger.info("Maximum number of repo tasks reached.")
break
task_id = session.newRepo(tag_id)
self.logger.info("Created newRepo task %s for tag %s" % (task_id, tag_id))
self.tasks[tag_id] = task_id
#some cleanup
n_deletes = 0
for tag_id, repolist in tag_repos.items():
if not tags.has_key(tag_id):
#repos for these tags are no longer required
for repo in repolist:
if repo.ready():
repo.expire()
for repo in repolist:
if n_deletes >= options.delete_batch_size:
break
if repo.expired():
#try to delete
if repo.tryDelete():
n_deletes += 1
def main():
repomgr = RepoManager()
repomgr.readCurrentRepos()
repomgr.pruneLocalRepos()
logger.info("Entering main loop")
while True:
try:
repomgr.updateRepos()
repomgr.printState()
repomgr.pruneLocalRepos()
except KeyboardInterrupt:
logger.warn("User exit")
break
except koji.AuthExpired:
logger.warn("Session expired")
break
except SystemExit:
logger.warn("Shutting down")
break
except:
# log the exception and continue
logger.error(''.join(traceback.format_exception(*sys.exc_info())))
try:
time.sleep(5)
except KeyboardInterrupt:
logger.warn("User exit")
break
try:
session.logout()
finally:
sys.exit()
def _exit_signal_handler(signum, frame):
logger.error('Exiting on signal')
session.logout()
sys.exit(1)
def get_options():
"""process options from command line and config file"""
# parse command line args
parser = OptionParser("usage: %prog [opts]")
parser.add_option("-c", "--config", dest="configFile",
help="use alternate configuration file", metavar="FILE",
default="/etc/kojira/kojira.conf")
parser.add_option("--user", help="specify user")
parser.add_option("--password", help="specify password")
parser.add_option("--principal", help="Kerberos principal")
parser.add_option("--keytab", help="Kerberos keytab")
parser.add_option("-f", "--fg", dest="daemon",
action="store_false", default=True,
help="run in foreground")
parser.add_option("-d", "--debug", action="store_true",
help="show debug output")
parser.add_option("-v", "--verbose", action="store_true",
help="show verbose output")
parser.add_option("--with-src", action="store_true",
help="include srpms in repos")
parser.add_option("--force-lock", action="store_true", default=False,
help="force lock for exclusive session")
parser.add_option("--debug-xmlrpc", action="store_true", default=False,
help="show xmlrpc debug output")
parser.add_option("--skip-main", action="store_true", default=False,
help="don't actually run main")
parser.add_option("--show-config", action="store_true", default=False,
help="Show config and exit")
parser.add_option("-s", "--server", help="URL of XMLRPC server")
parser.add_option("--topdir", help="Specify topdir")
parser.add_option("--logfile", help="Specify logfile")
(options, args) = parser.parse_args()
config = ConfigParser()
config.read(options.configFile)
section = 'kojira'
for x in config.sections():
if x != section:
quit('invalid section found in config file: %s' % x)
defaults = {'with_src': False,
'verbose': False,
'debug': False,
'topdir': '/mnt/koji',
'server': None,
'logfile': '/var/log/kojira.log',
'principal': None,
'keytab': None,
'retry_interval': 60,
'max_retries': 120,
'offline_retry': True,
'offline_retry_interval': 120,
'prune_batch_size': 4,
'delete_batch_size': 3,
'max_repo_tasks' : 10,
'deleted_repo_lifetime': 7*24*3600,
'cert': '/etc/kojira/client.crt',
'ca': '/etc/kojira/clientca.crt',
'serverca': '/etc/kojira/serverca.crt'
}
if config.has_section(section):
int_opts = ('prune_batch_size', 'deleted_repo_lifetime', 'max_repo_tasks',
'delete_batch_size', 'retry_interval', 'max_retries', 'offline_retry_interval')
str_opts = ('topdir','server','user','password','logfile', 'principal', 'keytab', 'cert', 'ca', 'serverca')
bool_opts = ('with_src','verbose','debug', 'offline_retry')
for name in config.options(section):
if name in int_opts:
defaults[name] = config.getint(section, name)
elif name in str_opts:
defaults[name] = config.get(section, name)
elif name in bool_opts:
defaults[name] = config.getboolean(section, name)
else:
quit("unknown config option: %s" % name)
for name, value in defaults.items():
if getattr(options, name, None) is None:
setattr(options, name, value)
if options.logfile in ('','None','none'):
options.logfile = None
return options
def quit(msg=None, code=1):
if msg:
logging.getLogger("koji.repo").error(msg)
sys.stderr.write('%s\n' % msg)
sys.stderr.flush()
sys.exit(code)
if __name__ == "__main__":
options = get_options()
topdir = getattr(options,'topdir',None)
pathinfo = koji.PathInfo(topdir)
if options.show_config:
pprint.pprint(options.__dict__)
sys.exit()
if options.logfile:
if not os.path.exists(options.logfile):
try:
logfile = open(options.logfile, "w")
logfile.close()
except:
sys.stderr.write("Cannot create logfile: %s\n" % options.logfile)
sys.exit(1)
if not os.access(options.logfile,os.W_OK):
sys.stderr.write("Cannot write to logfile: %s\n" % options.logfile)
sys.exit(1)
koji.add_file_logger("koji", options.logfile)
#note we're setting logging for koji.*
logger = logging.getLogger("koji")
if options.debug:
logger.setLevel(logging.DEBUG)
elif options.verbose:
logger.setLevel(logging.INFO)
else:
logger.setLevel(logging.WARNING)
session_opts = {}
for k in ('user', 'password', 'debug_xmlrpc', 'debug',
'retry_interval', 'max_retries', 'offline_retry', 'offline_retry_interval'):
session_opts[k] = getattr(options,k)
session = koji.ClientSession(options.server,session_opts)
if os.path.isfile(options.cert):
# authenticate using SSL client certificates
session.ssl_login(options.cert, options.ca, options.serverca)
elif options.user:
# authenticate using user/password
session.login()
elif sys.modules.has_key('krbV') and options.principal and options.keytab:
session.krb_login(options.principal, options.keytab)
#get an exclusive session
try:
session.exclusiveSession(force=options.force_lock)
except koji.AuthLockError:
quit("Error: Unable to get lock. Trying using --force-lock")
if not session.logged_in:
quit("Error: Unknown login error")
if not session.logged_in:
print "Error: unable to log in"
sys.exit(1)
if options.skip_main:
sys.exit()
elif options.daemon:
koji.daemonize()
else:
koji.add_stderr_logger("koji")
main()