1367 lines
54 KiB
Python
Executable file
1367 lines
54 KiB
Python
Executable file
#!/usr/bin/python3
|
|
|
|
# Koji Repository Administrator (kojira)
|
|
# Copyright (c) 2005-2014 Red Hat, Inc.
|
|
#
|
|
# Koji is free software; you can redistribute it and/or
|
|
# modify it under the terms of the GNU Lesser General Public
|
|
# License as published by the Free Software Foundation;
|
|
# version 2.1 of the License.
|
|
#
|
|
# This software is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
# Lesser General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Lesser General Public
|
|
# License along with this software; if not, write to the Free Software
|
|
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
|
|
#
|
|
# Authors:
|
|
# Mike McLean <mikem@redhat.com>
|
|
|
|
import errno
|
|
import logging
|
|
import logging.handlers
|
|
import os
|
|
import pprint
|
|
import signal
|
|
import stat
|
|
import sys
|
|
import threading
|
|
import time
|
|
import traceback
|
|
from optparse import OptionParser
|
|
from xml.etree import ElementTree
|
|
|
|
import requests
|
|
|
|
import koji
|
|
from koji.util import deprecated, parseStatus, rmtree, to_list
|
|
|
|
tag_cache = {}
|
|
|
|
|
|
def prefillTagCache(session):
|
|
tags = session.listTags()
|
|
now = time.time()
|
|
for tag in tags:
|
|
tag_cache[(tag['id'], None)] = (now, tag)
|
|
tag_cache[(tag['name'], None)] = (now, tag)
|
|
|
|
|
|
def getTag(session, tag, event=None):
|
|
"""A caching version of the hub call"""
|
|
cache = tag_cache
|
|
now = time.time()
|
|
if (tag, event) in cache:
|
|
ts, info = cache[(tag, event)]
|
|
if now - ts < 600:
|
|
# use the cache
|
|
return info
|
|
info = session.getTag(tag, event=event)
|
|
if info:
|
|
cache[(info['id'], event)] = (now, info)
|
|
cache[(info['name'], event)] = (now, info)
|
|
return info
|
|
|
|
|
|
class ManagedRepo(object):
|
|
|
|
def __init__(self, manager, data, repodata=None):
|
|
self.manager = manager
|
|
self.options = manager.options
|
|
self.logger = logging.getLogger("koji.repo")
|
|
self.current = True
|
|
self.repo_id = data['id']
|
|
self.event_id = data['create_event']
|
|
self.event_ts = data['create_ts']
|
|
self.tag_id = data['tag_id']
|
|
self.state = data['state']
|
|
if 'dist' in data:
|
|
self._dist = data['dist']
|
|
self.tag_name = data['tag_name']
|
|
self.expire_ts = None
|
|
if koji.REPO_STATES[self.state] in ['EXPIRED', 'DELETED', 'PROBLEM']:
|
|
self.current = False
|
|
self._find_expire_time(repodata)
|
|
# TODO use hub data to find the actual expiration time
|
|
self.first_seen = time.time()
|
|
self._taglist = None
|
|
|
|
@property
|
|
def session(self):
|
|
# return actual thread session object
|
|
return self.manager.session
|
|
|
|
@property
|
|
def taglist(self):
|
|
if not self._taglist:
|
|
order = self.session.getFullInheritance(self.tag_id, event=self.event_id)
|
|
# order may contain same tag more than once
|
|
tags = {self.tag_id: 1}
|
|
for x in order:
|
|
tags[x['parent_id']] = 1
|
|
self._taglist = to_list(tags.keys())
|
|
return self._taglist
|
|
|
|
def _find_expire_time(self, repodata):
|
|
# find all newer repos for same tag and set oldest as expire_ts for our repo
|
|
if repodata:
|
|
repos = [r for r in repodata if
|
|
r['tag_id'] == self.tag_id and r['create_event'] > self.event_id]
|
|
if repos:
|
|
invalidated_by = sorted(repos, key=lambda x: x['create_event'])
|
|
self.expire_ts = invalidated_by[0]['create_ts']
|
|
if not self.expire_ts:
|
|
self.expire_ts = time.time()
|
|
|
|
@property
|
|
def dist(self):
|
|
# TODO: remove this indirection once we can rely on the hub to return
|
|
# dist field in getActiveRepos
|
|
if hasattr(self, '_dist'):
|
|
return self._dist
|
|
rinfo = self.session.repoInfo(self.repo_id)
|
|
self._dist = rinfo['dist']
|
|
|
|
def get_info(self):
|
|
"Fetch data from repo.json"
|
|
path = self.get_path()
|
|
if not path:
|
|
# can this be an error yet?
|
|
return None
|
|
fn = '%s/repo.json' % path
|
|
if not os.path.exists(fn):
|
|
self.logger.warning('Repo info file missing: %s', fn)
|
|
return None
|
|
return koji.load_json(fn)
|
|
|
|
def get_path(self, volume=None):
|
|
"""Return the path to the repo directory"""
|
|
tag_info = getTag(self.session, self.tag_id)
|
|
if not tag_info:
|
|
tag_info = getTag(self.session, self.tag_id, self.event_id)
|
|
if not tag_info:
|
|
self.logger.warning('Could not get info for tag %i, referenced by repo %i' %
|
|
(self.tag_id, self.repo_id))
|
|
return None
|
|
tag_name = tag_info['name']
|
|
if self.dist:
|
|
path = pathinfo.distrepo(self.repo_id, tag_name, volume=volume)
|
|
else:
|
|
# currently only dist repos can be on another volume
|
|
path = pathinfo.repo(self.repo_id, tag_name)
|
|
return path
|
|
|
|
def expire(self):
|
|
"""Mark the repo expired"""
|
|
if self.state == koji.REPO_EXPIRED:
|
|
return
|
|
elif self.state == koji.REPO_DELETED:
|
|
raise koji.GenericError("Repo already deleted")
|
|
self.logger.info("Expiring repo %s.." % self.repo_id)
|
|
self.session.repoExpire(self.repo_id)
|
|
self.state = koji.REPO_EXPIRED
|
|
|
|
def expired(self):
|
|
return self.state == koji.REPO_EXPIRED
|
|
|
|
def pending(self, timeout=180):
|
|
"""Determine if repo generation appears to be in progress and not already obsolete"""
|
|
if self.state != koji.REPO_INIT:
|
|
return False
|
|
age = time.time() - self.event_ts
|
|
return self.current and age < timeout
|
|
|
|
def stale(self):
|
|
"""Determine if repo seems stale
|
|
|
|
By stale, we mean:
|
|
- state=INIT
|
|
- timestamp really, really old
|
|
"""
|
|
timeout = 36000
|
|
# XXX - config
|
|
if self.state != koji.REPO_INIT:
|
|
return False
|
|
times = [self.event_ts]
|
|
# the mtime is also factored in because a repo can be
|
|
# created from an older event and should not be expired based solely on
|
|
# that event's timestamp.
|
|
path = self.get_path()
|
|
if os.path.exists(path):
|
|
try:
|
|
times.append(os.stat(path).st_mtime)
|
|
except Exception:
|
|
self.logger.error("Can't read mtime for %s" % path)
|
|
return False
|
|
else:
|
|
times.append(self.first_seen)
|
|
self.logger.warning("Repo %d is in INIT state, "
|
|
"but doesn't have directory %s yet?" % (self.repo_id, path))
|
|
age = time.time() - max(times)
|
|
return age > timeout
|
|
|
|
def tryDelete(self, logger):
|
|
"""Remove the repo from disk, if possible"""
|
|
path = self.get_path()
|
|
if not path:
|
|
# get_path already warned
|
|
return False
|
|
if self.dist:
|
|
lifetime = self.options.dist_repo_lifetime
|
|
else:
|
|
lifetime = self.options.deleted_repo_lifetime
|
|
# (should really be called expired_repo_lifetime)
|
|
try:
|
|
# also check dir age. We do this because a repo can be created from an older event
|
|
# and should not be removed based solely on that event's timestamp.
|
|
mtime = os.stat(path).st_mtime
|
|
except OSError as e:
|
|
if e.errno == 2:
|
|
# No such file or directory, so the repo either never existed,
|
|
# or has already been deleted, so allow it to be marked deleted.
|
|
logger.info("Repo directory does not exist: %s" % path)
|
|
pass
|
|
else:
|
|
logger.error("Can't stat repo directory: %s, %s" % (path, e.strerror))
|
|
return False
|
|
else:
|
|
times = [self.event_ts, mtime, self.expire_ts]
|
|
times = [ts for ts in times if ts is not None]
|
|
age = time.time() - max(times)
|
|
logger.debug("Repo %s (%s) age: %i sec", self.repo_id, path, age)
|
|
if age < lifetime:
|
|
return False
|
|
logger.debug("Attempting to delete repo %s.." % self.repo_id)
|
|
if self.state != koji.REPO_EXPIRED:
|
|
raise koji.GenericError("Repo not expired")
|
|
if self.session.repoDelete(self.repo_id) > 0:
|
|
# cannot delete, we are referenced by a buildroot
|
|
logger.debug("Cannot delete repo %s, still referenced" % self.repo_id)
|
|
return False
|
|
logger.info("Deleted repo %s" % self.repo_id)
|
|
self.state = koji.REPO_DELETED
|
|
if os.path.islink(path):
|
|
# expected for repos on other volumes
|
|
info = self.get_info()
|
|
if not os.path.exists(path):
|
|
logger.error('Repo volume link broken: %s', path)
|
|
return False
|
|
if not info or 'volume' not in info:
|
|
logger.error('Missing repo.json in %s', path)
|
|
return False
|
|
realpath = self.get_path(volume=info['volume'])
|
|
if not os.path.exists(realpath):
|
|
logger.error('Repo real path missing: %s', realpath)
|
|
return False
|
|
if self.options.ignore_other_volumes:
|
|
# don't delete from other volumes
|
|
logger.error('Repo on non-default volume %s', realpath)
|
|
return False
|
|
if not os.path.samefile(path, realpath):
|
|
logger.error('Incorrect volume link: %s', path)
|
|
return False
|
|
# ok, try to remove the symlink
|
|
try:
|
|
os.unlink(path)
|
|
except OSError:
|
|
logger.error('Unable to remove volume link: %s', path)
|
|
else:
|
|
realpath = path
|
|
|
|
self.manager.rmtree(realpath)
|
|
|
|
return True
|
|
|
|
def ready(self):
|
|
return self.state == koji.REPO_READY
|
|
|
|
def deleted(self):
|
|
return self.state == koji.REPO_DELETED
|
|
|
|
def problem(self):
|
|
return self.state == koji.REPO_PROBLEM
|
|
|
|
|
|
class LoggingLockManager(object):
|
|
"""A context manager that acquires all the logging locks"""
|
|
|
|
# This lock business is a workaround for https://pagure.io/koji/issue/2714
|
|
|
|
def __enter__(self):
|
|
self.acquire_locks()
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_val, traceback):
|
|
# we want to free the locks regardless of what happend inside the with statement
|
|
self.release_locks()
|
|
return False # don't eat exceptions
|
|
|
|
def acquire_locks(self):
|
|
# init
|
|
self.locks = []
|
|
self.module_lock = False
|
|
toplogger = logging.getLogger('koji')
|
|
|
|
# module level lock
|
|
if hasattr(logging, '_acquireLock'):
|
|
logging._acquireLock()
|
|
self.module_lock = True
|
|
|
|
# also each handler can have its own lock
|
|
# in kojira, the we should only have handlers attached to the koji logger
|
|
self.locks = [h.lock for h in toplogger.handlers if h.lock]
|
|
for lock in self.locks:
|
|
lock.acquire()
|
|
|
|
def release_locks(self):
|
|
# Only parent process should have locked locks in 3.9+, child ones will
|
|
# be reinitilized to free state
|
|
# In older pythons, state could be random (and no module_lock)
|
|
if self.module_lock:
|
|
try:
|
|
logging._releaseLock()
|
|
except RuntimeError:
|
|
pass
|
|
for lock in self.locks:
|
|
try:
|
|
lock.release()
|
|
except RuntimeError:
|
|
pass
|
|
|
|
|
|
class RepoManager(object):
|
|
|
|
def __init__(self, options, session):
|
|
self.options = options
|
|
self._local = threading.local()
|
|
self._local.session = session
|
|
self.repos = {}
|
|
self.external_repo_ts = {}
|
|
self.tasks = {}
|
|
self.other_tasks = {}
|
|
self.needed_tags = {}
|
|
self.tag_use_stats = {}
|
|
self.delete_pids = {}
|
|
self.delete_queue = []
|
|
self.logger = logging.getLogger("koji.repo.manager")
|
|
prefillTagCache(session)
|
|
|
|
@property
|
|
def session(self):
|
|
# session is stored in our threadlocal instance
|
|
return self._local.session
|
|
|
|
@session.setter
|
|
def session(self, value):
|
|
self._local.session = value
|
|
|
|
def printState(self):
|
|
self.logger.debug('Tracking %i repos, %i child processes',
|
|
len(self.repos), len(self.delete_pids))
|
|
for tag_id, task_id in self.tasks.items():
|
|
self.logger.debug("Tracking task %s for tag %s", task_id, tag_id)
|
|
for pid, desc in self.delete_pids.items():
|
|
self.logger.debug("Delete job %s: %r", pid, desc)
|
|
|
|
def rmtree(self, path):
|
|
"""Spawn (or queue) and rmtree job"""
|
|
self.logger.info("Queuing rmtree job for %s", path)
|
|
if path not in self.delete_queue:
|
|
self.delete_queue.append(path)
|
|
|
|
def checkQueue(self):
|
|
finished = [pid for pid in self.delete_pids if self.waitPid(pid)]
|
|
for pid in finished:
|
|
path = self.delete_pids[pid]
|
|
self.logger.info("Completed rmtree job for %s", path)
|
|
del self.delete_pids[pid]
|
|
while self.delete_queue and len(self.delete_pids) <= self.options.max_delete_processes:
|
|
path = self.delete_queue.pop(0)
|
|
pid = self._rmtree(path)
|
|
self.logger.info("Started rmtree (pid %i) for %s", pid, path)
|
|
self.delete_pids[pid] = path
|
|
|
|
def waitPid(self, pid):
|
|
# XXX - can we unify with TaskManager?
|
|
prefix = "pid %i (%s)" % (pid, self.delete_pids.get(pid))
|
|
try:
|
|
(childpid, status) = os.waitpid(pid, os.WNOHANG)
|
|
except OSError as e:
|
|
if e.errno != errno.ECHILD:
|
|
# should not happen
|
|
raise
|
|
# otherwise assume the process is gone
|
|
self.logger.info("%s: %s" % (prefix, e))
|
|
return True
|
|
if childpid != 0:
|
|
self.logger.info(parseStatus(status, prefix))
|
|
return True
|
|
return False
|
|
|
|
def _rmtree(self, path):
|
|
with LoggingLockManager():
|
|
pid = os.fork()
|
|
if pid:
|
|
return pid
|
|
# no return
|
|
try:
|
|
status = 1
|
|
self.session._forget()
|
|
try:
|
|
rmtree(path)
|
|
status = 0
|
|
except BaseException:
|
|
logger.error(''.join(traceback.format_exception(*sys.exc_info())))
|
|
logging.shutdown()
|
|
finally:
|
|
os._exit(status)
|
|
|
|
def killChildren(self):
|
|
# XXX - unify with TaskManager?
|
|
sig = signal.SIGTERM
|
|
for pid in self.delete_pids:
|
|
try:
|
|
os.kill(pid, sig)
|
|
except OSError as e:
|
|
if e.errno != errno.ESRCH:
|
|
logger.error("Unable to kill process %s", pid)
|
|
|
|
def readCurrentRepos(self):
|
|
self.logger.debug("Reading current repo data")
|
|
repodata = self.session.getActiveRepos()
|
|
self.logger.debug("Repo data: %r" % repodata)
|
|
self.logger.debug("Preloaded %d tags" % len(tag_cache.keys()))
|
|
|
|
for data in repodata:
|
|
repo_id = data['id']
|
|
repo = self.repos.get(repo_id)
|
|
if repo:
|
|
# we're already tracking it
|
|
if repo.state != data['state']:
|
|
self.logger.info(
|
|
'State changed for repo %s: %s -> %s',
|
|
repo_id, koji.REPO_STATES[repo.state], koji.REPO_STATES[data['state']])
|
|
repo.state = data['state']
|
|
else:
|
|
self.logger.info('Found repo %s, state=%s'
|
|
% (repo_id, koji.REPO_STATES[data['state']]))
|
|
repo = ManagedRepo(self, data, repodata)
|
|
self.repos[repo_id] = repo
|
|
if not getTag(self.session, repo.tag_id) and not repo.expired():
|
|
self.logger.info('Tag %d for repo %d disappeared, expiring.', repo.tag_id, repo_id)
|
|
repo.expire()
|
|
if len(self.repos) > len(repodata):
|
|
# This shouldn't normally happen, but might if someone else calls
|
|
# repoDelete or similar
|
|
active = set([r['id'] for r in repodata])
|
|
for repo_id in to_list(self.repos.keys()):
|
|
if repo_id not in active:
|
|
self.logger.info('Dropping entry for inactive repo: %s', repo_id)
|
|
del self.repos[repo_id]
|
|
|
|
def checkExternalRepo(self, ts, repodata, tag):
|
|
"""Determine which external repos are current, return True if remote repo is newer"""
|
|
url = repodata['url']
|
|
|
|
# expand the arch urls if needed
|
|
expanded_urls = [url]
|
|
if '$arch' in url:
|
|
taginfo = getTag(self.session, tag)
|
|
if not taginfo:
|
|
self.logger.error('Invalid tag for external repo: %s', tag)
|
|
return False
|
|
arches = (taginfo.get('arches', '') or '').split()
|
|
if not arches:
|
|
self.logger.warning('Tag with external repo lacks arches: %(name)s', taginfo)
|
|
return False
|
|
expanded_urls = [url.replace('$arch', a) for a in arches]
|
|
|
|
# find latest timestamp across expanded urls
|
|
max_ts = 0
|
|
for arch_url in expanded_urls:
|
|
arch_url = os.path.join(arch_url, 'repodata/repomd.xml')
|
|
if arch_url in self.external_repo_ts:
|
|
# just use the cache
|
|
max_ts = max(max_ts, self.external_repo_ts[arch_url])
|
|
continue
|
|
self.logger.debug('Checking external url: %s' % arch_url)
|
|
try:
|
|
r = requests.get(arch_url, timeout=5)
|
|
root = ElementTree.fromstring(r.text) # nosec
|
|
ts_elements = root.iter('{http://linux.duke.edu/metadata/repo}timestamp')
|
|
arch_ts = max([round(float(child.text)) for child in ts_elements])
|
|
self.external_repo_ts[arch_url] = arch_ts
|
|
max_ts = max(max_ts, arch_ts)
|
|
except Exception:
|
|
# inaccessible or without timestamps
|
|
# treat repo as unchanged (ts = 0)
|
|
self.logger.warning('Unable to read timestamp for external repo: %s', arch_url)
|
|
self.external_repo_ts[arch_url] = 0
|
|
pass
|
|
|
|
return ts < max_ts
|
|
|
|
def reposToCheck(self):
|
|
to_check = []
|
|
repo_ids = to_list(self.repos.keys())
|
|
for repo_id in repo_ids:
|
|
repo = self.repos.get(repo_id)
|
|
if repo is None:
|
|
# removed by main thread
|
|
continue
|
|
if not repo.current:
|
|
# no point in checking again
|
|
continue
|
|
if repo.state not in (koji.REPO_READY, koji.REPO_INIT):
|
|
repo.current = False
|
|
if repo.expire_ts is None:
|
|
repo.expire_ts = time.time()
|
|
# also no point in further checking
|
|
continue
|
|
to_check.append(repo)
|
|
if self.logger.isEnabledFor(logging.DEBUG):
|
|
skipped = set(repo_ids).difference([r.repo_id for r in to_check])
|
|
self.logger.debug("Skipped check for repos: %r", skipped)
|
|
return to_check
|
|
|
|
def checkExternalRepos(self):
|
|
"""Determine which external repos changed"""
|
|
# clean external repo cache
|
|
self.external_repo_ts = {}
|
|
for repo in self.reposToCheck():
|
|
changed = False
|
|
for tag in repo.taglist:
|
|
try:
|
|
external_repos = self.session.getExternalRepoList(tag)
|
|
except koji.GenericError:
|
|
# in case tag was deleted, checkCurrentRepos is
|
|
# responsible for cleanup, ignore it here
|
|
external_repos = []
|
|
for external_repo in external_repos:
|
|
changed = self.checkExternalRepo(repo.event_ts, external_repo, tag)
|
|
self.logger.debug("Check external repo %s [%s] for tag %s: %s" % (
|
|
external_repo['external_repo_id'], external_repo['url'],
|
|
tag, changed))
|
|
if changed:
|
|
break
|
|
if changed:
|
|
break
|
|
if changed:
|
|
self.logger.info("Repo %i no longer current due to external repo change" %
|
|
repo.repo_id)
|
|
repo.current = False
|
|
repo.expire_ts = time.time()
|
|
|
|
def checkCurrentRepos(self):
|
|
"""Determine which repos are current"""
|
|
for repo in self.reposToCheck():
|
|
if self.session.tagChangedSinceEvent(repo.event_id, repo.taglist):
|
|
self.logger.info("Repo %i no longer current", repo.repo_id)
|
|
repo.current = False
|
|
repo.expire_ts = time.time()
|
|
|
|
def currencyChecker(self, session):
|
|
"""Continually checks repos for currency. Runs as a separate thread"""
|
|
self.session = session
|
|
self.logger = logging.getLogger("koji.repo.currency")
|
|
self.logger.info('currencyChecker starting')
|
|
try:
|
|
while True:
|
|
self.checkCurrentRepos()
|
|
time.sleep(self.options.sleeptime)
|
|
except Exception:
|
|
self.logger.exception('Error in currency checker thread')
|
|
raise
|
|
finally:
|
|
session.logout()
|
|
|
|
def currencyExternalChecker(self, session):
|
|
"""Continually checks repos for external repo currency. Runs as a separate thread"""
|
|
self.session = session
|
|
self.logger = logging.getLogger("koji.repo.currency_external")
|
|
self.logger.info('currencyExternalChecker starting')
|
|
try:
|
|
while True:
|
|
self.checkExternalRepos()
|
|
time.sleep(self.options.sleeptime)
|
|
except Exception:
|
|
self.logger.exception('Error in external currency checker thread')
|
|
raise
|
|
finally:
|
|
session.logout()
|
|
|
|
def regenLoop(self, session):
|
|
"""Triggers regens as needed/possible. Runs in a separate thread"""
|
|
self.session = session
|
|
self.logger = logging.getLogger("koji.repo.regen")
|
|
self.logger.info('regenLoop starting')
|
|
try:
|
|
while True:
|
|
self.regenRepos()
|
|
time.sleep(self.options.sleeptime)
|
|
except Exception:
|
|
self.logger.exception('Error in regen thread')
|
|
raise
|
|
finally:
|
|
session.logout()
|
|
|
|
def deleteLoop(self, session):
|
|
"""Triggers regens as needed/possible. Runs in a separate thread"""
|
|
self.session = session
|
|
self.delete_logger = logging.getLogger("koji.repo.delete")
|
|
self.delete_logger.info('deleteLoop starting')
|
|
try:
|
|
while True:
|
|
self.deleteRepos()
|
|
time.sleep(self.options.sleeptime)
|
|
except Exception:
|
|
self.delete_logger.exception('Error in delete thread')
|
|
raise
|
|
finally:
|
|
session.logout()
|
|
|
|
def rmtreeLoop(self, session):
|
|
self.session = session
|
|
logger = logging.getLogger("koji.repo.rmtree")
|
|
try:
|
|
while True:
|
|
logger.debug('queue length: %d', len(self.delete_queue))
|
|
self.checkQueue()
|
|
time.sleep(self.options.sleeptime)
|
|
except Exception:
|
|
logger.exception('Error in delete thread')
|
|
raise
|
|
finally:
|
|
session.logout()
|
|
|
|
def pruneLocalRepos(self):
|
|
volname = 'DEFAULT'
|
|
volumedir = pathinfo.volumedir(volname)
|
|
repodir = "%s/repos" % volumedir
|
|
self._pruneLocalRepos(repodir, self.options.deleted_repo_lifetime)
|
|
|
|
for volinfo in self.session.listVolumes():
|
|
volname = volinfo['name']
|
|
if volname == 'DEFAULT':
|
|
continue
|
|
if self.options.ignore_other_volumes:
|
|
# don't prune from other volumes
|
|
continue
|
|
volumedir = pathinfo.volumedir(volname)
|
|
distrepodir = "%s/repos-dist" % volumedir
|
|
self._pruneLocalRepos(distrepodir, self.options.dist_repo_lifetime)
|
|
|
|
def _pruneLocalRepos(self, topdir, max_age):
|
|
"""Scan filesystem for repos and remove any deleted ones
|
|
|
|
Also, warn about any oddities"""
|
|
if self.delete_pids:
|
|
# skip
|
|
return
|
|
if not os.path.exists(topdir):
|
|
self.logger.debug("%s doesn't exist, skipping", topdir)
|
|
return
|
|
if not os.path.isdir(topdir):
|
|
self.logger.warning("%s is not directory, skipping", topdir)
|
|
return
|
|
self.logger.debug("Scanning %s for repos", topdir)
|
|
self.logger.debug('max age allowed: %s seconds', max_age)
|
|
for tag in os.listdir(topdir):
|
|
tagdir = "%s/%s" % (topdir, tag)
|
|
if not os.path.isdir(tagdir):
|
|
self.logger.debug("%s is not a directory, skipping", tagdir)
|
|
continue
|
|
|
|
# Remember where latest symlink points to (if exists)
|
|
repo_latest_path = "%s/latest" % tagdir
|
|
repo_latest_id = (
|
|
os.readlink(repo_latest_path)
|
|
if os.path.isdir(repo_latest_path)
|
|
else None
|
|
)
|
|
for repo_id in os.listdir(tagdir):
|
|
if repo_id == 'latest' or repo_id == repo_latest_id:
|
|
# ignore latest symlinks or repo where one points to
|
|
self.logger.debug("%s is latest symlink or repo, skipping", tagdir)
|
|
continue
|
|
try:
|
|
repo_id = int(repo_id)
|
|
except ValueError:
|
|
self.logger.debug("%s/%s not an int, skipping", tagdir, repo_id)
|
|
continue
|
|
if repo_id in self.repos:
|
|
# we're already managing it, no need to deal with it here
|
|
continue
|
|
repodir = "%s/%s" % (tagdir, repo_id)
|
|
try:
|
|
# lstat because it could be link to another volume
|
|
dirstat = os.lstat(repodir)
|
|
except OSError:
|
|
# just in case something deletes the repo out from under us
|
|
self.logger.debug("%s deleted already?!", repodir)
|
|
continue
|
|
symlink = False
|
|
if stat.S_ISLNK(dirstat.st_mode):
|
|
symlink = True
|
|
elif not stat.S_ISDIR(dirstat.st_mode):
|
|
self.logger.debug("%s not a directory, skipping", repodir)
|
|
continue
|
|
dir_ts = dirstat.st_mtime
|
|
rinfo = self.session.repoInfo(repo_id)
|
|
if rinfo is None:
|
|
if not self.options.ignore_stray_repos:
|
|
age = time.time() - dir_ts
|
|
self.logger.debug("did not expect %s; age: %s",
|
|
repodir, age)
|
|
if age > max_age:
|
|
self.logger.info(
|
|
"Removing unexpected directory (no such repo): %s", repodir)
|
|
if symlink:
|
|
os.unlink(repodir)
|
|
else:
|
|
self.rmtree(repodir)
|
|
continue
|
|
if rinfo['tag_name'] != tag:
|
|
try:
|
|
# possible rename of tag, repo.json should exist and contain tag id
|
|
repo_json = koji.load_json(os.path.join(repodir, 'repo.json'))
|
|
if rinfo['tag_id'] != repo_json['tag_id']:
|
|
self.logger.warning(
|
|
"Tag name/id mismatch: directory: %s, name: %s, id: %s",
|
|
tag, rinfo['tag_name'], repo_json['tag_id'])
|
|
continue
|
|
except Exception:
|
|
self.logger.warning(
|
|
"Tag name mismatch (rename?): %s vs %s", tag, rinfo['tag_name'])
|
|
continue
|
|
if rinfo['state'] in (koji.REPO_DELETED, koji.REPO_PROBLEM):
|
|
age = time.time() - max(rinfo['create_ts'], dir_ts)
|
|
self.logger.debug("potential removal candidate: %s; age: %s" % (repodir, age))
|
|
if age > max_age:
|
|
logger.info("Removing stray repo (state=%s): %s",
|
|
koji.REPO_STATES[rinfo['state']], repodir)
|
|
if symlink:
|
|
os.unlink(repodir)
|
|
else:
|
|
self.rmtree(repodir)
|
|
|
|
def tagUseStats(self, tag_id):
|
|
stats = self.tag_use_stats.get(tag_id)
|
|
now = time.time()
|
|
if stats and now - stats['ts'] < 3600:
|
|
# use the cache
|
|
return stats
|
|
data = self.session.listBuildroots(tagID=tag_id,
|
|
queryOpts={'order': '-create_event_id', 'limit': 100})
|
|
# XXX magic number (limit)
|
|
if data:
|
|
tag_name = data[0]['tag_name']
|
|
else:
|
|
tag_name = "#%i" % tag_id
|
|
stats = {'data': data, 'ts': now, 'tag_name': tag_name}
|
|
recent = [x for x in data if now - x['create_ts'] < 3600 * 24]
|
|
# XXX magic number
|
|
stats['n_recent'] = len(recent)
|
|
self.tag_use_stats[tag_id] = stats
|
|
self.logger.debug("tag %s recent use count: %i" % (tag_name, len(recent)))
|
|
return stats
|
|
|
|
def setTagScore(self, entry):
|
|
"""Set score for needed_tag entry
|
|
|
|
We score the tags by two factors
|
|
- age of current repo
|
|
- last use in a buildroot
|
|
|
|
Having an older repo or a higher use count gives the tag a higher
|
|
priority for regen. The formula attempts to keep the last use factor
|
|
from overpowering, so that tags with very old repos still get priority
|
|
"""
|
|
|
|
stats = self.tagUseStats(entry['taginfo']['id'])
|
|
# normalize use count
|
|
max_n = max([t.get('n_recent', 0) for t in self.needed_tags.values()] or [1])
|
|
if max_n == 0:
|
|
# no recent use or missing data
|
|
max_n = 1
|
|
adj = stats['n_recent'] * 9.0 // max_n + 1 # 1.0 to 10.0
|
|
ts = entry['expire_ts']
|
|
age = time.time() - ts
|
|
# XXX - need to make sure our times aren't far off, otherwise this
|
|
# scoring could have the opposite of the desired effect
|
|
if age < 0:
|
|
self.logger.warning("Needed tag has future expire_ts: %r", entry)
|
|
age = 0
|
|
entry['score'] = age * adj
|
|
self.logger.debug("Needed tag %s got score %.2f",
|
|
entry['taginfo']['name'], entry['score'])
|
|
# so a day old unused repo gets about the regen same score as a
|
|
# 2.4-hour-old, very popular repo
|
|
|
|
def updateTagScores(self):
|
|
for entry in list(self.needed_tags.values()):
|
|
self.setTagScore(entry)
|
|
|
|
def _delete_needed_tag(self, tag_id):
|
|
try:
|
|
del self.needed_tags[tag_id]
|
|
except KeyError:
|
|
pass
|
|
|
|
def updateRepos(self):
|
|
self.logger.debug("Updating repos")
|
|
|
|
self.readCurrentRepos()
|
|
|
|
# check for stale repos
|
|
for repo in to_list(self.repos.values()):
|
|
if repo.stale():
|
|
repo.expire()
|
|
|
|
# find out which tags require repos
|
|
self.checkNeeded()
|
|
|
|
self.updateTagScores()
|
|
|
|
if self.options.queue_file:
|
|
with open(self.options.queue_file, "wt", encoding='utf-8') as f:
|
|
fmt = "%-40s %7s %5s\n"
|
|
f.write(fmt % ("Tag", "Expired", "Score"))
|
|
for tag in sorted(self.needed_tags.values(), key=lambda t: t['score'],
|
|
reverse=True):
|
|
time_expired = time.time() - tag['expire_ts']
|
|
f.write(fmt % (tag['taginfo']['name'], int(time_expired), int(tag['score'])))
|
|
|
|
def checkTasks(self):
|
|
"""Check on newRepo tasks
|
|
|
|
- update taskinfo
|
|
- remove finished tasks
|
|
- check for other newRepo tasks (not generated by us)
|
|
"""
|
|
|
|
# check on current tasks
|
|
task_ids = list(self.tasks)
|
|
self.session.multicall = True
|
|
for task_id in task_ids:
|
|
self.session.getTaskInfo(task_id)
|
|
for task_id, [tinfo] in zip(task_ids, self.session.multiCall(strict=True)):
|
|
tstate = koji.TASK_STATES[tinfo['state']]
|
|
tag_id = self.tasks[task_id]['tag_id']
|
|
if tstate == 'CLOSED':
|
|
self.logger.info("Finished: newRepo task %s for tag %s", task_id, tag_id)
|
|
del self.tasks[task_id]
|
|
self._delete_needed_tag(tag_id)
|
|
elif tstate in ('CANCELED', 'FAILED'):
|
|
self.logger.info(
|
|
"Problem: newRepo task %s for tag %s is %s", task_id, tag_id, tstate)
|
|
del self.tasks[task_id]
|
|
else:
|
|
self.tasks[task_id]['taskinfo'] = tinfo
|
|
# TODO: implement a timeout
|
|
|
|
# also check other newRepo tasks
|
|
repo_tasks = self.session.listTasks(opts={'method': 'newRepo',
|
|
'state': ([koji.TASK_STATES[s]
|
|
for s in ('FREE', 'OPEN')])})
|
|
others = [t for t in repo_tasks if t['id'] not in self.tasks]
|
|
for tinfo in others:
|
|
if tinfo['id'] not in self.other_tasks:
|
|
self.logger.info("Untracked newRepo task: %(id)i", tinfo)
|
|
# note: possible race here, but only a log message
|
|
# TODO - determine tag and maven support
|
|
self.other_tasks = dict([(t['id'], t) for t in others])
|
|
|
|
def checkNeeded(self):
|
|
"""Determine which tags currently need regeneration"""
|
|
|
|
n_need = len(self.needed_tags)
|
|
ignore = self.options.ignore_tags.split()
|
|
build_tags = set()
|
|
ignored_build_tags = set()
|
|
for t in self.session.getBuildTargets():
|
|
if koji.util.multi_fnmatch(t['build_tag_name'], ignore):
|
|
ignored_build_tags.add(t['build_tag'])
|
|
else:
|
|
build_tags.add(t['build_tag'])
|
|
|
|
# index repos by tag
|
|
tag_repos = {}
|
|
for repo in to_list(self.repos.values()):
|
|
tag_repos.setdefault(repo.tag_id, []).append(repo)
|
|
|
|
for tag_id in build_tags:
|
|
covered = False
|
|
for repo in tag_repos.get(tag_id, []):
|
|
if repo.current:
|
|
covered = True
|
|
break
|
|
elif repo.pending():
|
|
# one on the way
|
|
covered = True
|
|
break
|
|
if tag_id in self.needed_tags:
|
|
entry = self.needed_tags[tag_id]
|
|
if covered:
|
|
# no longer needed
|
|
self.logger.info("Tag %(name)s has a current or in "
|
|
"progress repo", entry['taginfo'])
|
|
self._delete_needed_tag(tag_id)
|
|
# if not covered, we already know
|
|
continue
|
|
if covered:
|
|
continue
|
|
|
|
# we haven't noted this need yet
|
|
taginfo = self.session.getTag(tag_id)
|
|
# (not using the caching version since we only call upon discovery)
|
|
if not taginfo:
|
|
self.logger.warning('Tag disappeared: %i', tag_id)
|
|
continue
|
|
self.logger.info('Tag needs regen: %(name)s', taginfo)
|
|
|
|
# how expired are we?
|
|
ts = 0
|
|
for repo in tag_repos.get(tag_id, []):
|
|
if repo.expire_ts:
|
|
if repo.expire_ts > ts:
|
|
ts = repo.expire_ts
|
|
else:
|
|
self.logger.warning("No expire timestamp for repo: %s", repo.repo_id)
|
|
if ts == 0:
|
|
ts = time.time()
|
|
|
|
entry = {
|
|
'taginfo': taginfo,
|
|
'expire_ts': ts,
|
|
'needed_since': time.time(),
|
|
}
|
|
self.setTagScore(entry)
|
|
self.needed_tags[tag_id] = entry
|
|
|
|
# some cleanup
|
|
for tag_id in list(self.needed_tags):
|
|
entry = self.needed_tags.get(tag_id)
|
|
if tag_id not in build_tags:
|
|
self.logger.info("Tag %(name)s is no longer a build tag",
|
|
entry['taginfo'])
|
|
self._delete_needed_tag(tag_id)
|
|
for tag_id, repolist in tag_repos.items():
|
|
if tag_id not in build_tags and tag_id not in ignored_build_tags:
|
|
# repos for these tags are no longer required
|
|
for repo in repolist:
|
|
if repo.dist:
|
|
# Dist repos should expire only basd on the time
|
|
continue
|
|
if repo.ready():
|
|
repo.expire()
|
|
|
|
if n_need != len(self.needed_tags):
|
|
self.logger.info('Needed tags count went from %i to %i', n_need,
|
|
len(self.needed_tags))
|
|
|
|
def regenRepos(self):
|
|
"""Trigger newRepo tasks for needed tags"""
|
|
|
|
self.checkTasks()
|
|
self.logger.debug("Current tasks: %r" % self.tasks)
|
|
if self.other_tasks:
|
|
self.logger.debug("Found %i untracked newRepo tasks",
|
|
len(self.other_tasks))
|
|
|
|
# first note currently running tasks
|
|
running_tasks = 0
|
|
running_tasks_maven = 0
|
|
for task in self.tasks.values():
|
|
if task['taskinfo']['waiting']:
|
|
self.logger.debug("Task %(id)i is waiting", task)
|
|
else:
|
|
# The largest hub impact is from the first part of the newRepo
|
|
# task. Once it is waiting on subtasks, that part is over
|
|
running_tasks += 1
|
|
if task['maven']:
|
|
running_tasks_maven += 1
|
|
|
|
debuginfo_pat = self.options.debuginfo_tags.split()
|
|
src_pat = self.options.source_tags.split()
|
|
separate_src_pat = self.options.separate_source_tags.split()
|
|
order = sorted(self.needed_tags.values(), key=lambda t: t['score'], reverse=True)
|
|
for tag in order:
|
|
if running_tasks >= self.options.max_repo_tasks:
|
|
self.logger.debug("Running tasks (%s): %s" % (running_tasks, list(self.tasks)))
|
|
self.logger.info("Maximum number of repo tasks reached")
|
|
return
|
|
elif len(self.tasks) + len(self.other_tasks) >= self.options.repo_tasks_limit:
|
|
self.logger.debug("Tracked tasks (%s): %s" % (len(self.tasks), list(self.tasks)))
|
|
self.logger.debug("Untracked tasks (%s): %s" % (len(self.other_tasks),
|
|
list(self.other_tasks)))
|
|
self.logger.info("Repo task limit reached")
|
|
return
|
|
tagname = tag['taginfo']['name']
|
|
task_id = tag.get('task_id')
|
|
if task_id:
|
|
if task_id in self.tasks:
|
|
# we already have a task
|
|
continue
|
|
else:
|
|
# should not happen
|
|
logger.warning('Needed tag refers to unknown task. '
|
|
'%s -> %i', tagname, task_id)
|
|
# we'll advance and create a new task
|
|
taskopts = {}
|
|
if koji.util.multi_fnmatch(tagname, debuginfo_pat):
|
|
taskopts['debuginfo'] = True
|
|
if koji.util.multi_fnmatch(tagname, src_pat):
|
|
taskopts['src'] = True
|
|
if koji.util.multi_fnmatch(tagname, separate_src_pat):
|
|
taskopts['separate_src'] = True
|
|
maven = tag['taginfo']['maven_support']
|
|
if maven:
|
|
if running_tasks_maven >= self.options.max_repo_tasks_maven:
|
|
continue
|
|
try:
|
|
task_id = self.session.newRepo(tagname, **taskopts)
|
|
except koji.GenericError:
|
|
# potentially deleted tag
|
|
if not self.session.getTag(tagname):
|
|
self._delete_needed_tag(tag['taginfo']['id'])
|
|
logger.debug('Needed tag %s (%d) was deleted meanwhile',
|
|
tagname, tag['taginfo']['id'])
|
|
continue
|
|
else:
|
|
raise
|
|
running_tasks += 1
|
|
if maven:
|
|
running_tasks_maven += 1
|
|
expire_ts = tag['expire_ts']
|
|
if expire_ts == 0: # can this still happen?
|
|
time_expired = '???'
|
|
else:
|
|
time_expired = "%.1f" % (time.time() - expire_ts)
|
|
self.logger.info("Created newRepo task %s for tag %s (%s), "
|
|
"expired for %s sec", task_id, tag['taginfo']['id'],
|
|
tag['taginfo']['name'], time_expired)
|
|
self.tasks[task_id] = {
|
|
'id': task_id,
|
|
'taskinfo': self.session.getTaskInfo(task_id),
|
|
'tag_id': tag['taginfo']['id'],
|
|
'maven': maven,
|
|
}
|
|
tag['task_id'] = task_id
|
|
if running_tasks_maven >= self.options.max_repo_tasks_maven:
|
|
self.logger.info("Maximum number of maven repo tasks reached")
|
|
|
|
def deleteRepos(self):
|
|
# trigger deletes
|
|
self.delete_logger.debug("Starting delete repos")
|
|
n = 0
|
|
for repo in to_list(self.repos.values()):
|
|
if repo.expired():
|
|
# try to delete
|
|
if repo.tryDelete(self.delete_logger):
|
|
try:
|
|
del self.repos[repo.repo_id]
|
|
except KeyError:
|
|
# during tryDelete repo record is deleted on hub
|
|
# if readCurrentRepos is called meanwhile, it could have
|
|
# cleaned self.repos already
|
|
pass
|
|
n += 1
|
|
self.delete_logger.debug("Ending delete repos (queued for deletion: %s)" % n)
|
|
|
|
|
|
def start_currency_checker(session, repomgr):
|
|
subsession = session.subsession()
|
|
thread = threading.Thread(name='currencyChecker',
|
|
target=repomgr.currencyChecker, args=(subsession,))
|
|
thread.daemon = True
|
|
thread.start()
|
|
return thread
|
|
|
|
|
|
def start_external_currency_checker(session, repomgr):
|
|
subsession = session.subsession()
|
|
thread = threading.Thread(name='currencyExternalChecker',
|
|
target=repomgr.currencyExternalChecker, args=(subsession,))
|
|
thread.daemon = True
|
|
thread.start()
|
|
return thread
|
|
|
|
|
|
def start_regen_loop(session, repomgr):
|
|
subsession = session.subsession()
|
|
thread = threading.Thread(name='regenLoop',
|
|
target=repomgr.regenLoop, args=(subsession,))
|
|
thread.daemon = True
|
|
thread.start()
|
|
return thread
|
|
|
|
|
|
def start_delete_loop(session, repomgr):
|
|
subsession = session.subsession()
|
|
thread = threading.Thread(name='deleteLoop',
|
|
target=repomgr.deleteLoop, args=(subsession,))
|
|
thread.daemon = True
|
|
thread.start()
|
|
return thread
|
|
|
|
|
|
def start_rmtree_loop(session, repomgr):
|
|
subsession = session.subsession()
|
|
thread = threading.Thread(name='rmtreeLoop',
|
|
target=repomgr.rmtreeLoop, args=(subsession,))
|
|
thread.daemon = True
|
|
thread.start()
|
|
return thread
|
|
|
|
|
|
def main(options, session):
|
|
repomgr = RepoManager(options, session)
|
|
repomgr.readCurrentRepos()
|
|
|
|
def shutdown(*args):
|
|
raise SystemExit
|
|
signal.signal(signal.SIGTERM, shutdown)
|
|
curr_chk_thread = start_currency_checker(session, repomgr)
|
|
if options.check_external_repos:
|
|
curr_ext_chk_thread = start_external_currency_checker(session, repomgr)
|
|
regen_thread = start_regen_loop(session, repomgr)
|
|
delete_thread = start_delete_loop(session, repomgr)
|
|
rmtree_thread = start_rmtree_loop(session, repomgr)
|
|
# TODO also move rmtree jobs to threads
|
|
logger.info("Entering main loop")
|
|
while True:
|
|
try:
|
|
repomgr.updateRepos()
|
|
repomgr.printState()
|
|
repomgr.pruneLocalRepos()
|
|
if not curr_chk_thread.is_alive():
|
|
logger.error("Currency checker thread died. Restarting it.")
|
|
curr_chk_thread = start_currency_checker(session, repomgr)
|
|
if options.check_external_repos and not curr_ext_chk_thread.is_alive():
|
|
logger.error("External currency checker thread died. Restarting it.")
|
|
curr_ext_chk_thread = start_external_currency_checker(session, repomgr)
|
|
if not regen_thread.is_alive():
|
|
logger.error("Regeneration thread died. Restarting it.")
|
|
regen_thread = start_regen_loop(session, repomgr)
|
|
if not delete_thread.is_alive():
|
|
logger.error("Delete thread died. Restarting it.")
|
|
delete_thread = start_delete_loop(session, repomgr)
|
|
if not rmtree_thread.is_alive():
|
|
logger.error("rmtree thread died. Restarting it.")
|
|
rmtree_thread = start_rmtree_loop(session, repomgr)
|
|
except KeyboardInterrupt:
|
|
logger.warning("User exit")
|
|
break
|
|
except koji.AuthExpired:
|
|
logger.warning("Session expired")
|
|
break
|
|
except koji.AuthError:
|
|
logger.warning("Authentication error")
|
|
break
|
|
except SystemExit:
|
|
logger.warning("Shutting down")
|
|
break
|
|
except Exception:
|
|
# log the exception and continue
|
|
logger.error(''.join(traceback.format_exception(*sys.exc_info())))
|
|
try:
|
|
time.sleep(options.sleeptime)
|
|
except KeyboardInterrupt:
|
|
logger.warning("User exit")
|
|
break
|
|
try:
|
|
repomgr.checkQueue()
|
|
repomgr.killChildren()
|
|
finally:
|
|
session.logout()
|
|
|
|
|
|
def get_options():
|
|
"""process options from command line and config file"""
|
|
# parse command line args
|
|
parser = OptionParser("usage: %prog [opts]")
|
|
parser.add_option("-c", "--config", dest="configFile",
|
|
help="use alternate configuration file", metavar="FILE",
|
|
default="/etc/kojira/kojira.conf")
|
|
parser.add_option("--user", help="specify user")
|
|
parser.add_option("--password", help="specify password")
|
|
parser.add_option("--principal", help="Kerberos principal")
|
|
parser.add_option("--keytab", help="Kerberos keytab")
|
|
parser.add_option("-f", "--fg", dest="daemon",
|
|
action="store_false", default=True,
|
|
help="run in foreground")
|
|
parser.add_option("-d", "--debug", action="store_true",
|
|
help="show debug output")
|
|
parser.add_option("-q", "--quiet", action="store_true",
|
|
help="don't show warnings")
|
|
parser.add_option("-v", "--verbose", action="store_true",
|
|
help="show verbose output")
|
|
parser.add_option("--force-lock", action="store_true", default=False,
|
|
help="force lock for exclusive session")
|
|
parser.add_option("--debug-xmlrpc", action="store_true", default=False,
|
|
help="show xmlrpc debug output")
|
|
parser.add_option("--skip-main", action="store_true", default=False,
|
|
help="don't actually run main")
|
|
parser.add_option("--show-config", action="store_true", default=False,
|
|
help="Show config and exit")
|
|
parser.add_option("--sleeptime", type='int', help="Specify the polling interval")
|
|
parser.add_option("-s", "--server", help="URL of XMLRPC server")
|
|
parser.add_option("--topdir", help="Specify topdir")
|
|
parser.add_option("--logfile", help="Specify logfile")
|
|
parser.add_option("--queue-file",
|
|
help="If specified, queue is dumped to separate status file each cycle")
|
|
(options, args) = parser.parse_args()
|
|
|
|
config = koji.read_config_files(options.configFile)
|
|
section = 'kojira'
|
|
for x in config.sections():
|
|
if x != section:
|
|
quit('invalid section found in config file: %s' % x)
|
|
defaults = {'debuginfo_tags': '',
|
|
'source_tags': '',
|
|
'separate_source_tags': '',
|
|
'ignore_tags': '',
|
|
'verbose': False,
|
|
'debug': False,
|
|
'ignore_stray_repos': False,
|
|
'topdir': '/mnt/koji',
|
|
'server': None,
|
|
'logfile': '/var/log/kojira.log',
|
|
'principal': None,
|
|
'keytab': '/etc/kojira/kojira.keytab',
|
|
'ccache': '/var/tmp/kojira.ccache',
|
|
'retry_interval': 60,
|
|
'max_retries': 120,
|
|
'offline_retry': True,
|
|
'offline_retry_interval': 120,
|
|
'no_ssl_verify': False,
|
|
'max_delete_processes': 4,
|
|
'max_repo_tasks': 4,
|
|
'max_repo_tasks_maven': 2,
|
|
'repo_tasks_limit': 10,
|
|
'deleted_repo_lifetime': 7 * 24 * 3600,
|
|
# XXX should really be called expired_repo_lifetime
|
|
'dist_repo_lifetime': 7 * 24 * 3600,
|
|
'check_external_repos': False,
|
|
'sleeptime': 15,
|
|
'cert': None,
|
|
'serverca': None,
|
|
'queue_file': None,
|
|
'ignore_other_volumes': False,
|
|
}
|
|
if config.has_section(section):
|
|
int_opts = ('deleted_repo_lifetime', 'max_repo_tasks', 'repo_tasks_limit',
|
|
'retry_interval', 'max_retries', 'offline_retry_interval',
|
|
'max_delete_processes', 'max_repo_tasks_maven', 'dist_repo_lifetime',
|
|
'sleeptime')
|
|
str_opts = ('topdir', 'server', 'user', 'password', 'logfile', 'principal', 'keytab',
|
|
'cert', 'serverca', 'debuginfo_tags', 'queue_file',
|
|
'source_tags', 'separate_source_tags', 'ignore_tags')
|
|
bool_opts = ('verbose', 'debug', 'ignore_stray_repos', 'offline_retry',
|
|
'no_ssl_verify', 'check_external_repos', 'ignore_other_volumes')
|
|
legacy_opts = ('with_src', 'delete_batch_size', 'recent_tasks_lifetime')
|
|
for name in config.options(section):
|
|
if name in int_opts:
|
|
defaults[name] = config.getint(section, name)
|
|
elif name in str_opts:
|
|
defaults[name] = config.get(section, name)
|
|
elif name in bool_opts:
|
|
defaults[name] = config.getboolean(section, name)
|
|
elif name in legacy_opts:
|
|
deprecated('The %s configuration option is no longer used\n' % name)
|
|
else:
|
|
quit("unknown config option: %s" % name)
|
|
for name, value in defaults.items():
|
|
if getattr(options, name, None) is None:
|
|
setattr(options, name, value)
|
|
if options.logfile in ('', 'None', 'none'):
|
|
options.logfile = None
|
|
# special handling for cert defaults
|
|
cert_defaults = {
|
|
'cert': '/etc/kojira/client.crt',
|
|
'serverca': '/etc/kojira/serverca.crt',
|
|
}
|
|
for name in cert_defaults:
|
|
if getattr(options, name, None) is None:
|
|
fn = cert_defaults[name]
|
|
if os.path.exists(fn):
|
|
setattr(options, name, fn)
|
|
return options
|
|
|
|
|
|
def quit(msg=None, code=1):
|
|
if msg:
|
|
logging.getLogger("koji.repo").error(msg)
|
|
sys.stderr.write('%s\n' % msg)
|
|
sys.stderr.flush()
|
|
sys.exit(code)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
options = get_options()
|
|
topdir = getattr(options, 'topdir', None)
|
|
pathinfo = koji.PathInfo(topdir)
|
|
if options.show_config:
|
|
pprint.pprint(options.__dict__)
|
|
sys.exit()
|
|
if options.logfile:
|
|
if not os.path.exists(options.logfile):
|
|
try:
|
|
logfile = open(options.logfile, "w")
|
|
logfile.close()
|
|
except Exception:
|
|
sys.stderr.write("Cannot create logfile: %s\n" % options.logfile)
|
|
sys.exit(1)
|
|
if not os.access(options.logfile, os.W_OK):
|
|
sys.stderr.write("Cannot write to logfile: %s\n" % options.logfile)
|
|
sys.exit(1)
|
|
koji.add_file_logger("koji", options.logfile)
|
|
# note we're setting logging for koji.*
|
|
logger = logging.getLogger("koji")
|
|
if options.debug:
|
|
logger.setLevel(logging.DEBUG)
|
|
elif options.verbose:
|
|
logger.setLevel(logging.INFO)
|
|
elif options.quiet:
|
|
logger.setLevel(logging.ERROR)
|
|
else:
|
|
logger.setLevel(logging.WARNING)
|
|
|
|
session_opts = koji.grab_session_options(options)
|
|
session = koji.ClientSession(options.server, session_opts)
|
|
try:
|
|
if options.cert is not None and os.path.isfile(options.cert):
|
|
# authenticate using SSL client certificates
|
|
session.ssl_login(options.cert, None, options.serverca)
|
|
elif options.user:
|
|
# authenticate using user/password
|
|
session.login()
|
|
elif koji.reqgssapi and options.principal and options.keytab:
|
|
session.gssapi_login(options.principal, options.keytab, options.ccache)
|
|
else:
|
|
quit("No username/password/certificate supplied and Kerberos missing or "
|
|
"not configured")
|
|
except koji.AuthError as ex:
|
|
quit(str(ex))
|
|
# get an exclusive session
|
|
try:
|
|
session.exclusiveSession(force=options.force_lock)
|
|
except koji.AuthLockError:
|
|
quit("Error: Unable to get lock. Trying using --force-lock")
|
|
if not session.logged_in:
|
|
quit("Error: Unknown login error")
|
|
if not session.logged_in:
|
|
print("Error: unable to log in")
|
|
sys.exit(1)
|
|
if options.skip_main:
|
|
sys.exit()
|
|
elif options.daemon:
|
|
koji.daemonize()
|
|
else:
|
|
koji.add_stderr_logger("koji")
|
|
main(options, session)
|