932 lines
36 KiB
Python
Executable file
932 lines
36 KiB
Python
Executable file
#!/usr/bin/python3
|
|
|
|
# Koji Repository Administrator (kojira)
|
|
# Copyright (c) 2005-2014 Red Hat, Inc.
|
|
#
|
|
# Koji is free software; you can redistribute it and/or
|
|
# modify it under the terms of the GNU Lesser General Public
|
|
# License as published by the Free Software Foundation;
|
|
# version 2.1 of the License.
|
|
#
|
|
# This software is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
# Lesser General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Lesser General Public
|
|
# License along with this software; if not, write to the Free Software
|
|
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
|
|
#
|
|
# Authors:
|
|
# Mike McLean <mikem@redhat.com>
|
|
|
|
import errno
|
|
import logging
|
|
import logging.handlers
|
|
import os
|
|
import pprint
|
|
import signal
|
|
import stat
|
|
import sys
|
|
import threading
|
|
import time
|
|
import traceback
|
|
from optparse import OptionParser, SUPPRESS_HELP
|
|
from xml.etree import ElementTree
|
|
|
|
from collections import OrderedDict
|
|
|
|
import requests
|
|
|
|
import koji
|
|
from koji.util import deprecated, parseStatus, rmtree, to_list, dslice
|
|
|
|
|
|
class ManagedRepo(object):
|
|
|
|
def __init__(self, manager, data):
|
|
self.manager = manager
|
|
self.options = manager.options
|
|
self.logger = logging.getLogger("koji.repo")
|
|
self.data = data
|
|
self.id = self.repo_id = data['id']
|
|
self.tag_id = data['tag_id']
|
|
self.dist = data['dist']
|
|
self.first_seen = time.time()
|
|
|
|
@property
|
|
def session(self):
|
|
# return actual thread session object
|
|
return self.manager.session
|
|
|
|
@property
|
|
def state(self):
|
|
return self.data['state']
|
|
|
|
@state.setter
|
|
def state(self, value):
|
|
self.data['state'] = value
|
|
|
|
def get_info(self):
|
|
"Fetch data from repo.json"
|
|
path = self.get_path()
|
|
fn = '%s/repo.json' % path
|
|
if not os.path.exists(fn):
|
|
self.logger.warning('Repo info file missing: %s', fn)
|
|
return None
|
|
return koji.load_json(fn)
|
|
|
|
def get_path(self, volume=None):
|
|
"""Return the path to the repo directory"""
|
|
tag_name = self.data['tag_name']
|
|
if self.dist:
|
|
path = pathinfo.distrepo(self.repo_id, tag_name, volume=volume)
|
|
else:
|
|
# currently only dist repos can be on another volume
|
|
path = pathinfo.repo(self.repo_id, tag_name)
|
|
return path
|
|
|
|
def expire(self):
|
|
"""Mark the repo expired"""
|
|
if self.state == koji.REPO_EXPIRED:
|
|
return
|
|
elif self.state == koji.REPO_PROBLEM:
|
|
# should not happen
|
|
raise koji.GenericError("Can't expire problem repo %s" % self.id)
|
|
elif self.state == koji.REPO_DELETED:
|
|
# should not happen
|
|
raise koji.GenericError("Repo %s already deleted" % self.id)
|
|
if self.dist:
|
|
self.logger.info("Expiring dist repo %(id)s for tag %(tag_name)s", self.data)
|
|
else:
|
|
self.logger.info("Expiring repo %(id)s for tag %(tag_name)s", self.data)
|
|
self.session.repoExpire(self.repo_id)
|
|
self.state = koji.REPO_EXPIRED
|
|
|
|
def mark_problem(self):
|
|
"""Flag a problem repo"""
|
|
if self.state == koji.REPO_DELETED:
|
|
# should not happen
|
|
raise koji.GenericError("Repo is deleted")
|
|
self.logger.error("Problem repo %s.." % self.repo_id)
|
|
self.session.repoProblem(self.repo_id)
|
|
self.state = koji.REPO_PROBLEM
|
|
|
|
def check_init(self):
|
|
"""Check on repos in init state"""
|
|
if self.state != koji.REPO_INIT:
|
|
return
|
|
|
|
if not self.data.get('creation_ts'):
|
|
self.logger.warning('Repo in init state lacks timestamp: %r', self.data)
|
|
self.mark_problem()
|
|
return
|
|
|
|
age = time.time() - self.data['creation_ts']
|
|
if age > self.options.init_timeout:
|
|
self.logger.warning("Stale repo: %r", self.data)
|
|
self.mark_problem()
|
|
elif self.data['task_state'] in [koji.TASK_STATES[n] for n in ('CANCELED', 'FAILED')]:
|
|
self.logger.warning("Repo task failed: %(task_id)s", self.data)
|
|
self.mark_problem()
|
|
|
|
def get_age(self):
|
|
if self.data.get('state_ts'):
|
|
return time.time() - self.data['state_ts']
|
|
|
|
# otherwise repo predates 1.35, fall back to mtime
|
|
self.logger.warning("Missing timestamps for repo %(id)s (tag %(tag_name)s)", self.data)
|
|
path = self.get_path()
|
|
try:
|
|
return time.time() - os.stat(path).st_mtime
|
|
except OSError as e:
|
|
self.logger.error("Can't read mtime for %s - %s", path, e)
|
|
return time.time() - self.first_seen
|
|
|
|
def expire_check(self):
|
|
if self.state != koji.REPO_READY:
|
|
return
|
|
|
|
if self.data['end_event'] is None and not self.data['custom_opts']:
|
|
# repo is current and has default options. keep it
|
|
# this covers current dist repos, where custom_opts=None
|
|
return
|
|
|
|
# keep repos for configured lifetime
|
|
if self.dist:
|
|
lifetime = self.options.dist_repo_lifetime
|
|
else:
|
|
lifetime = self.options.repo_lifetime
|
|
if self.get_age() <= lifetime:
|
|
return
|
|
|
|
# remaining checks are more expensive, don't recheck every cycle
|
|
last_check = getattr(self, 'expire_check_ts', None)
|
|
if last_check and time.time() - last_check < self.options.recheck_period:
|
|
return
|
|
self.expire_check_ts = time.time()
|
|
|
|
# keep latest default repo in some cases, even if not current
|
|
if self.dist:
|
|
# no target check -- they are irrelevant for dist repos
|
|
if self.is_latest():
|
|
return
|
|
elif not self.data['custom_opts']:
|
|
# normal repo, default options
|
|
targets = self.session.getBuildTargets(buildTagID=self.data['tag_id'])
|
|
if targets and self.is_latest():
|
|
return
|
|
|
|
self.expire()
|
|
|
|
def is_latest(self):
|
|
"""Check if repo is latest for its tag (not necessarily current)"""
|
|
# similar query to symlink_if_latest on hub
|
|
clauses = [
|
|
['tag_id', '=', self.data['tag_id']],
|
|
['state', '=', koji.REPO_READY],
|
|
['create_event', '>', self.data['create_event']],
|
|
]
|
|
if self.dist:
|
|
clauses.append(['dist', '=', True])
|
|
else:
|
|
clauses.append(['dist', '=', False])
|
|
clauses.append(['custom_opts', '=', '{}'])
|
|
# ^this clause is only for normal repos, dist repos have custom_opts=None
|
|
newer = self.session.repo.query(clauses, ['id'])
|
|
return not newer # True if no newer matching repo
|
|
|
|
def delete_check(self):
|
|
"""Delete the repo if appropriate"""
|
|
|
|
# correct state?
|
|
if self.state not in (koji.REPO_EXPIRED, koji.REPO_PROBLEM):
|
|
# shouldn't happen
|
|
self.logger.error('Repo %(id)s cannot be deleted, state=%(state)s', self.data)
|
|
return
|
|
|
|
if self.get_age() < self.options.expired_repo_lifetime:
|
|
return
|
|
|
|
# reference check
|
|
last_check = getattr(self, 'reference_ts', None)
|
|
if last_check and time.time() - last_check < self.options.reference_recheck_period:
|
|
# we don't need to recheck every cycle
|
|
return
|
|
refs = self.session.repo.references(self.repo_id)
|
|
if refs:
|
|
# this shouldn't happen normally, may indicate a stale build task
|
|
self.logger.warning('Repo %s still has %i references' % (self.repo_id, len(refs)))
|
|
if self.logger.isEnabledFor(logging.DEBUG):
|
|
for ref in refs:
|
|
self.logger.debug('Ref: %r', ref)
|
|
self.reference_ts = time.time()
|
|
return
|
|
|
|
# ok, safe to delete
|
|
self.delete()
|
|
|
|
def delete(self):
|
|
"""Mark repo deleted and initiate file removal
|
|
|
|
* remove volume symlinks immediately
|
|
* queue rmtree job for rest of files
|
|
"""
|
|
|
|
if self.state not in (koji.REPO_EXPIRED, koji.REPO_PROBLEM):
|
|
# should not happen
|
|
raise koji.GenericError("Repo cannot be deleted, state=%s", self.state)
|
|
|
|
# mark deleted in the db
|
|
self.session.repo.setState(self.repo_id, koji.REPO_DELETED)
|
|
self.logger.info("Set repo %s state to deleted" % self.repo_id)
|
|
self.state = koji.REPO_DELETED
|
|
|
|
# deal with volume symlinks
|
|
path = self.get_path()
|
|
if os.path.islink(path):
|
|
# expected for repos on other volumes
|
|
info = self.get_info()
|
|
if not os.path.exists(path):
|
|
self.logger.error('Repo volume link broken: %s', path)
|
|
return False
|
|
if not info or 'volume' not in info:
|
|
self.logger.error('Missing or invalid repo.json in %s', path)
|
|
return False
|
|
realpath = self.get_path(volume=info['volume'])
|
|
if not os.path.exists(realpath):
|
|
self.logger.error('Repo real path missing: %s', realpath)
|
|
return False
|
|
if self.options.ignore_other_volumes:
|
|
# don't delete from other volumes
|
|
self.logger.error('Repo on non-default volume %s', realpath)
|
|
return False
|
|
if not os.path.samefile(path, realpath):
|
|
self.logger.error('Incorrect volume link: %s', path)
|
|
return False
|
|
# ok, try to remove the symlink
|
|
try:
|
|
os.unlink(path)
|
|
except OSError:
|
|
self.logger.error('Unable to remove volume link: %s', path)
|
|
else:
|
|
realpath = path
|
|
|
|
# queue the rmtree job
|
|
self.manager.rmtree(realpath)
|
|
|
|
def handle_problem(self):
|
|
self.delete_check()
|
|
|
|
def is_expired(self):
|
|
return self.state == koji.REPO_EXPIRED
|
|
|
|
|
|
class RepoManager(object):
|
|
|
|
def __init__(self, options, session):
|
|
self.options = options
|
|
self._local = threading.local()
|
|
self._local.session = session
|
|
self.repos = {}
|
|
self.delete_pids = {}
|
|
self.delete_queue = OrderedDict()
|
|
self.logger = logging.getLogger("koji.repo.manager")
|
|
|
|
@property
|
|
def session(self):
|
|
# session is stored in our threadlocal instance
|
|
return self._local.session
|
|
|
|
@session.setter
|
|
def session(self, value):
|
|
self._local.session = value
|
|
|
|
def printState(self):
|
|
self.logger.debug('Tracking %i repos, %i child processes',
|
|
len(self.repos), len(self.delete_pids))
|
|
for pid in self.delete_pids:
|
|
path = self.delete_pids[pid][0]
|
|
self.logger.debug("Delete job %s: %r", pid, path)
|
|
|
|
def rmtree(self, path):
|
|
"""Spawn (or queue) and rmtree job"""
|
|
self.logger.info("Queuing rmtree job for %s", path)
|
|
if path not in self.delete_queue:
|
|
self.delete_queue[path] = 1
|
|
|
|
def checkQueue(self):
|
|
finished = [pid for pid in self.delete_pids if self.waitPid(pid)]
|
|
for pid in finished:
|
|
path, check_func = self.delete_pids[pid]
|
|
del self.delete_pids[pid]
|
|
try:
|
|
check_func()
|
|
except Exception as e:
|
|
self.logger.error("Failed rmtree job for %s: %s", path, e)
|
|
continue
|
|
self.logger.info("Completed rmtree job for %s", path)
|
|
while self.delete_queue and len(self.delete_pids) < self.options.max_delete_processes:
|
|
path, _ = self.delete_queue.popitem(last=False)
|
|
pid, check_func = rmtree(path, background=True) # koji.util.rmtree
|
|
self.logger.info("Started rmtree (pid %i) for %s", pid, path)
|
|
self.delete_pids[pid] = (path, check_func)
|
|
|
|
def waitPid(self, pid):
|
|
# XXX - can we unify with TaskManager?
|
|
prefix = "pid %i (%s)" % (pid, self.delete_pids.get(pid)[0])
|
|
try:
|
|
(childpid, status) = os.waitpid(pid, os.WNOHANG)
|
|
except OSError as e:
|
|
if e.errno != errno.ECHILD:
|
|
# should not happen
|
|
raise
|
|
# otherwise assume the process is gone
|
|
self.logger.info("%s: %s" % (prefix, e))
|
|
return True
|
|
if childpid != 0:
|
|
self.logger.info(parseStatus(status, prefix))
|
|
return True
|
|
return False
|
|
|
|
def killChildren(self):
|
|
# XXX - unify with TaskManager?
|
|
sig = signal.SIGTERM
|
|
for pid in self.delete_pids:
|
|
try:
|
|
os.kill(pid, sig)
|
|
except OSError as e:
|
|
if e.errno != errno.ESRCH:
|
|
logger.error("Unable to kill process %s", pid)
|
|
|
|
def readCurrentRepos(self):
|
|
self.logger.debug("Reading current repo data")
|
|
clauses = [['state', '!=', koji.REPO_DELETED]]
|
|
fields = ('id', 'tag_id', 'create_event', 'state', 'dist', 'task_id', 'tag_name',
|
|
'creation_ts', 'state_ts', 'end_event', 'opts', 'custom_opts', 'task_state')
|
|
repodata = self.session.repo.query(clauses, fields)
|
|
self.logger.debug("Repo data: %r" % repodata)
|
|
|
|
for data in repodata:
|
|
repo_id = data['id']
|
|
repo = self.repos.get(repo_id)
|
|
if repo:
|
|
# we're already tracking it
|
|
if repo.state != data['state']:
|
|
self.logger.info(
|
|
'State changed for repo %s: %s -> %s',
|
|
repo_id, koji.REPO_STATES[repo.state], koji.REPO_STATES[data['state']])
|
|
repo.data = data
|
|
else:
|
|
self.logger.info('Found repo %s, state=%s'
|
|
% (repo_id, koji.REPO_STATES[data['state']]))
|
|
repo = ManagedRepo(self, data)
|
|
self.repos[repo_id] = repo
|
|
|
|
if len(self.repos) > len(repodata):
|
|
active = set([r['id'] for r in repodata])
|
|
for repo_id in list(self.repos): # copy because we modify keys
|
|
repo = self.repos[repo_id]
|
|
if repo_id not in active:
|
|
if repo.state != koji.REPO_DELETED:
|
|
# we only expect this for deleted repos
|
|
self.logger.warning('Repo entry disappeared from hub: %r', repo.data)
|
|
else:
|
|
self.logger.info('Dropping entry for deleted repo: %s', repo_id)
|
|
del self.repos[repo_id]
|
|
|
|
def checkExternalRepo(self, repodata, arches, ts_cache):
|
|
"""Update tracking data for external repo"""
|
|
url = repodata['url']
|
|
|
|
# expand the arch urls if needed
|
|
expanded_urls = [url]
|
|
if '$arch' in url:
|
|
if not arches:
|
|
# caller should already have warned
|
|
return
|
|
expanded_urls = [url.replace('$arch', a) for a in arches]
|
|
|
|
# get previously recorded timestamp, if any
|
|
data = self.session.repo.getExternalRepoData(repodata['id']) or {}
|
|
orig = data.get('max_ts', 0)
|
|
|
|
# find latest timestamp across expanded urls
|
|
new_ts = 0
|
|
for arch_url in expanded_urls:
|
|
arch_url = os.path.join(arch_url, 'repodata/repomd.xml')
|
|
if arch_url in ts_cache:
|
|
# just use the cache
|
|
new_ts = max(new_ts, ts_cache[arch_url])
|
|
continue
|
|
self.logger.debug('Checking external url: %s' % arch_url)
|
|
try:
|
|
r = requests.get(arch_url, timeout=5)
|
|
r.raise_for_status()
|
|
root = ElementTree.fromstring(r.text) # nosec
|
|
ts_elements = root.iter('{http://linux.duke.edu/metadata/repo}timestamp')
|
|
arch_ts = max([round(float(child.text)) for child in ts_elements])
|
|
ts_cache[arch_url] = arch_ts
|
|
new_ts = max(new_ts, arch_ts)
|
|
except requests.exceptions.HTTPError as e:
|
|
if e.response.status_code == 404:
|
|
# we check all hub arches, so this can happen pretty easily
|
|
# we'll warn below if _no_ arches give us a timestamp
|
|
self.logger.debug("External repo url not found: %s", arch_url)
|
|
else:
|
|
self.logger.warning("Error reading external repo url %s: %s", arch_url, e)
|
|
ts_cache[arch_url] = 0
|
|
except Exception:
|
|
# inaccessible or without timestamps
|
|
# treat repo as unchanged (ts = 0)
|
|
self.logger.warning('Unable to read timestamp for external repo: %s', arch_url)
|
|
ts_cache[arch_url] = 0
|
|
|
|
if new_ts == 0:
|
|
self.logger.warning('Unable to determine timestamp for external repo: %s', url)
|
|
elif new_ts > orig:
|
|
self.logger.info('Updating timestamp for external repo %s: %s', repodata['id'], new_ts)
|
|
self.session.repo.setExternalRepoData(repodata['id'], {'max_ts': new_ts})
|
|
|
|
def checkExternalRepos(self):
|
|
"""Determine which external repos changed"""
|
|
# get active external repos
|
|
# we only bother checking those that are actually used in some tag
|
|
used = self.session.getTagExternalRepos()
|
|
external_repos = {}
|
|
# fields specific to the external repo entry
|
|
fields = ('external_repo_id', 'external_repo_name', 'url')
|
|
for tag_repo in used:
|
|
key = tag_repo['external_repo_id']
|
|
if key not in external_repos:
|
|
external_repos[key] = dslice(tag_repo, fields)
|
|
|
|
# get arches to check
|
|
arches = self.session.getAllArches()
|
|
# this is all _host_ arches, canonicalized, which should be fine for our purposes
|
|
# getting the list of all tag arches for all tags that might use the repo is
|
|
# way more expensive
|
|
if not arches:
|
|
self.logger.warning('No arches reported by hub. Are any hosts enabled? '
|
|
'Unable to check external repos containing $arch')
|
|
|
|
ts_cache = {}
|
|
for erepo_id in sorted(external_repos):
|
|
data = external_repos[erepo_id]
|
|
data['id'] = erepo_id
|
|
self.checkExternalRepo(data, arches, ts_cache)
|
|
|
|
def threadLoop(self, session, name):
|
|
"""Wrapper for running thread handlers in a loop"""
|
|
# we should be passed a subsession of main
|
|
self.session = session
|
|
handler = getattr(self, f'do_{name}')
|
|
self.logger = logging.getLogger(f'koji.repo.{name}')
|
|
self.logger.info(f'{name} thread starting')
|
|
try:
|
|
while True:
|
|
handler()
|
|
time.sleep(self.options.sleeptime)
|
|
except Exception:
|
|
self.logger.exception(f'Error in {name} thread')
|
|
raise
|
|
finally:
|
|
session.logout()
|
|
|
|
def do_currency(self):
|
|
"""Checks repos for currency"""
|
|
# this call can take a while
|
|
self.session.repo.updateEndEvents()
|
|
|
|
def do_check_external(self):
|
|
"""Check external repos"""
|
|
self.checkExternalRepos()
|
|
|
|
def do_regen(self):
|
|
"""Triggers regens as needed/possible"""
|
|
self.session.repo.checkQueue()
|
|
|
|
def do_autoregen(self):
|
|
"""Triggers automatic regens as needed/possible"""
|
|
self.session.repo.autoRequests()
|
|
|
|
def do_rmtree(self):
|
|
logger.debug('queue length: %d', len(self.delete_queue))
|
|
self.checkQueue()
|
|
|
|
def pruneLocalRepos(self):
|
|
# non-dist repos are always on the default volume
|
|
volname = 'DEFAULT'
|
|
volumedir = pathinfo.volumedir(volname)
|
|
repodir = "%s/repos" % volumedir
|
|
self._pruneLocalRepos(repodir)
|
|
|
|
for volinfo in self.session.listVolumes():
|
|
volname = volinfo['name']
|
|
if volname == 'DEFAULT':
|
|
continue
|
|
if self.options.ignore_other_volumes:
|
|
# don't prune from other volumes
|
|
continue
|
|
volumedir = pathinfo.volumedir(volname)
|
|
distrepodir = "%s/repos-dist" % volumedir
|
|
self._pruneLocalRepos(distrepodir)
|
|
|
|
def _pruneLocalRepos(self, topdir):
|
|
"""Scan filesystem for repos and remove any deleted or stray ones
|
|
|
|
Specifically, we remove:
|
|
- repo dirs the hub does not know about (unless ignore_stray_repos is true)
|
|
- repos the hub think are already deleted
|
|
|
|
We ignore:
|
|
- any repos with age < expired_repo_lifetime
|
|
- any repos we're already tracking
|
|
- any repos already in the delete queue
|
|
|
|
Also, we warn about unexpected content, but do not remove it
|
|
"""
|
|
|
|
max_age = self.options.expired_repo_lifetime
|
|
# TODO - do we need a different lifetime option here?
|
|
if self.delete_pids:
|
|
# skip
|
|
return
|
|
if not os.path.exists(topdir):
|
|
self.logger.debug("%s doesn't exist, skipping", topdir)
|
|
return
|
|
if not os.path.isdir(topdir):
|
|
self.logger.warning("%s is not directory, skipping", topdir)
|
|
return
|
|
self.logger.debug("Scanning %s for repos", topdir)
|
|
self.logger.debug('max age allowed: %s seconds', max_age)
|
|
for tag in os.listdir(topdir):
|
|
tagdir = "%s/%s" % (topdir, tag)
|
|
if not os.path.isdir(tagdir):
|
|
self.logger.debug("%s is not a directory, skipping", tagdir)
|
|
continue
|
|
|
|
# Remember where latest symlink points to (if exists)
|
|
repo_latest_path = "%s/latest" % tagdir
|
|
repo_latest_id = (
|
|
os.readlink(repo_latest_path)
|
|
if os.path.isdir(repo_latest_path)
|
|
else None
|
|
)
|
|
for repo_id in os.listdir(tagdir):
|
|
if repo_id == 'latest' or repo_id == repo_latest_id:
|
|
# ignore latest symlinks or repo where one points to
|
|
continue
|
|
try:
|
|
repo_id = int(repo_id)
|
|
except ValueError:
|
|
self.logger.debug("%s/%s not an int, skipping", tagdir, repo_id)
|
|
continue
|
|
if repo_id in self.repos:
|
|
# we're already managing it, no need to deal with it here
|
|
continue
|
|
repodir = "%s/%s" % (tagdir, repo_id)
|
|
if repodir in self.delete_queue:
|
|
# no need to queue again
|
|
continue
|
|
try:
|
|
# lstat because it could be link to another volume
|
|
dirstat = os.lstat(repodir)
|
|
except OSError:
|
|
# just in case something deletes the repo out from under us
|
|
self.logger.debug("%s deleted already?!", repodir)
|
|
continue
|
|
symlink = False
|
|
if stat.S_ISLNK(dirstat.st_mode):
|
|
symlink = True
|
|
elif not stat.S_ISDIR(dirstat.st_mode):
|
|
self.logger.debug("%s not a directory, skipping", repodir)
|
|
continue
|
|
dir_ts = dirstat.st_mtime
|
|
rinfo = self.session.repoInfo(repo_id)
|
|
if rinfo is None:
|
|
if not self.options.ignore_stray_repos:
|
|
age = time.time() - dir_ts
|
|
self.logger.debug("did not expect %s; age: %s",
|
|
repodir, age)
|
|
if age > max_age:
|
|
self.logger.info(
|
|
"Removing unexpected directory (no such repo): %s", repodir)
|
|
if symlink:
|
|
os.unlink(repodir)
|
|
else:
|
|
self.rmtree(repodir)
|
|
continue
|
|
if rinfo['tag_name'] != tag:
|
|
try:
|
|
# possible rename of tag, repo.json should exist and contain tag id
|
|
repo_json = koji.load_json(os.path.join(repodir, 'repo.json'))
|
|
if rinfo['tag_id'] != repo_json['tag_id']:
|
|
self.logger.warning(
|
|
"Tag name/id mismatch: directory: %s, name: %s, id: %s",
|
|
tag, rinfo['tag_name'], repo_json['tag_id'])
|
|
continue
|
|
except Exception:
|
|
self.logger.warning(
|
|
"Tag name mismatch (rename?): %s vs %s", tag, rinfo['tag_name'])
|
|
continue
|
|
if rinfo['state'] == koji.REPO_DELETED:
|
|
# Note that we already checked delete_queue above
|
|
age = time.time() - max(rinfo.get('state_ts', 0), dir_ts)
|
|
self.logger.debug("potential removal candidate: %s; age: %s" % (repodir, age))
|
|
if age > max_age:
|
|
logger.info("Removing stray repo (state=%s): %s",
|
|
koji.REPO_STATES[rinfo['state']], repodir)
|
|
if symlink:
|
|
os.unlink(repodir)
|
|
else:
|
|
self.rmtree(repodir)
|
|
|
|
def updateRepos(self):
|
|
self.logger.debug("Updating repos")
|
|
|
|
self.readCurrentRepos()
|
|
|
|
for repo in to_list(self.repos.values()):
|
|
if repo.state == koji.REPO_INIT:
|
|
repo.check_init()
|
|
elif repo.state == koji.REPO_READY:
|
|
repo.expire_check()
|
|
elif repo.state == koji.REPO_EXPIRED:
|
|
repo.delete_check()
|
|
elif repo.state == koji.REPO_PROBLEM:
|
|
repo.handle_problem()
|
|
|
|
|
|
def start_thread(session, repomgr, name):
|
|
handler = getattr(repomgr, 'threadLoop')
|
|
subsession = session.subsession()
|
|
thread = threading.Thread(name=name, target=handler, args=(subsession, name))
|
|
thread.daemon = True
|
|
thread.start()
|
|
return thread
|
|
|
|
|
|
def main(options, session):
|
|
repomgr = RepoManager(options, session)
|
|
repomgr.readCurrentRepos()
|
|
|
|
def shutdown(*args):
|
|
raise SystemExit
|
|
signal.signal(signal.SIGTERM, shutdown)
|
|
tnames = ['currency', 'regen', 'autoregen', 'rmtree']
|
|
if options.check_external_repos:
|
|
tnames.append('check_external')
|
|
threads = {name: start_thread(session, repomgr, name) for name in tnames}
|
|
logger.info("Entering main loop")
|
|
exit_code = 0
|
|
while True:
|
|
try:
|
|
repomgr.updateRepos()
|
|
repomgr.printState()
|
|
repomgr.pruneLocalRepos()
|
|
for name in tnames:
|
|
if not threads[name].is_alive():
|
|
logger.error(f'{name} thread died. Restarting it.')
|
|
threads[name] = start_thread(session, repomgr, name)
|
|
except KeyboardInterrupt:
|
|
logger.warning("User exit")
|
|
break
|
|
except koji.AuthExpired:
|
|
logger.warning("Session expired")
|
|
exit_code = 1
|
|
break
|
|
except koji.AuthError:
|
|
logger.warning("Authentication error")
|
|
exit_code = 1
|
|
break
|
|
except SystemExit:
|
|
logger.warning("Shutting down")
|
|
break
|
|
except Exception:
|
|
# log the exception and continue
|
|
logger.error(''.join(traceback.format_exception(*sys.exc_info())))
|
|
try:
|
|
time.sleep(options.sleeptime)
|
|
except KeyboardInterrupt:
|
|
logger.warning("User exit")
|
|
break
|
|
try:
|
|
repomgr.checkQueue()
|
|
repomgr.killChildren()
|
|
finally:
|
|
session.logout()
|
|
sys.exit(exit_code)
|
|
|
|
|
|
def get_options():
|
|
"""process options from command line and config file"""
|
|
# parse command line args
|
|
parser = OptionParser("usage: %prog [opts]")
|
|
parser.add_option("-c", "--config", dest="configFile",
|
|
help="use alternate configuration file", metavar="FILE",
|
|
default="/etc/kojira/kojira.conf")
|
|
parser.add_option("--user", help="specify user")
|
|
parser.add_option("--password", help="specify password")
|
|
parser.add_option("--principal", help="Kerberos principal")
|
|
parser.add_option("--keytab", help="Kerberos keytab")
|
|
parser.add_option("-f", "--fg", dest="daemon",
|
|
action="store_false", default=True,
|
|
help="run in foreground")
|
|
parser.add_option("-d", "--debug", action="store_true",
|
|
help="show debug output")
|
|
parser.add_option("-q", "--quiet", action="store_true",
|
|
help="don't show warnings")
|
|
parser.add_option("-v", "--verbose", action="store_true",
|
|
help="show verbose output")
|
|
parser.add_option("--force-lock", action="store_true", default=False,
|
|
help="force lock for exclusive session")
|
|
parser.add_option("--debug-xmlrpc", action="store_true", default=False,
|
|
help="show xmlrpc debug output")
|
|
parser.add_option("--skip-main", action="store_true", default=False,
|
|
help="don't actually run main")
|
|
parser.add_option("--show-config", action="store_true", default=False,
|
|
help="Show config and exit")
|
|
parser.add_option("--sleeptime", type='int', help="Specify the polling interval")
|
|
parser.add_option("-s", "--server", help="URL of XMLRPC server")
|
|
parser.add_option("--topdir", help="Specify topdir")
|
|
parser.add_option("--logfile", help="Specify logfile")
|
|
parser.add_option("--queue-file", help=SUPPRESS_HELP)
|
|
(options, args) = parser.parse_args()
|
|
|
|
config = koji.read_config_files(options.configFile)
|
|
section = 'kojira'
|
|
for x in config.sections():
|
|
if x != section:
|
|
quit('invalid section found in config file: %s' % x)
|
|
defaults = {'debuginfo_tags': '',
|
|
'source_tags': '',
|
|
'separate_source_tags': '',
|
|
'ignore_tags': '',
|
|
'verbose': False,
|
|
'debug': False,
|
|
'ignore_stray_repos': False,
|
|
'topdir': '/mnt/koji',
|
|
'server': None,
|
|
'logfile': '/var/log/kojira.log',
|
|
'principal': None,
|
|
'keytab': '/etc/kojira/kojira.keytab',
|
|
'ccache': '/var/tmp/kojira.ccache',
|
|
'retry_interval': 60,
|
|
'max_retries': 120,
|
|
'offline_retry': True,
|
|
'offline_retry_interval': 120,
|
|
'no_ssl_verify': False,
|
|
'max_delete_processes': 4,
|
|
'max_repo_tasks': 4,
|
|
'max_repo_tasks_maven': 2,
|
|
'repo_tasks_limit': 10,
|
|
'repo_lifetime': 7 * 24 * 3600,
|
|
'dist_repo_lifetime': 7 * 24 * 3600,
|
|
'expired_repo_lifetime': None, # default handled below
|
|
'deleted_repo_lifetime': None, # compat alias for expired_repo_lifetime
|
|
'init_timeout': 7200,
|
|
'recheck_period': 3600,
|
|
'reference_recheck_period': None, # defaults to recheck_period
|
|
'no_repo_effective_age': 2 * 24 * 3600,
|
|
'check_external_repos': True,
|
|
'sleeptime': 15,
|
|
'cert': None,
|
|
'serverca': None,
|
|
'queue_file': None,
|
|
'ignore_other_volumes': False,
|
|
}
|
|
if config.has_section(section):
|
|
int_opts = ('deleted_repo_lifetime',
|
|
'retry_interval', 'max_retries', 'offline_retry_interval',
|
|
'max_delete_processes', 'dist_repo_lifetime',
|
|
'sleeptime', 'expired_repo_lifetime',
|
|
'repo_lifetime', 'recheck_period', 'reference_recheck_period')
|
|
str_opts = ('topdir', 'server', 'user', 'password', 'logfile', 'principal', 'keytab',
|
|
'cert', 'serverca', 'ccache')
|
|
bool_opts = ('verbose', 'debug', 'ignore_stray_repos', 'offline_retry',
|
|
'no_ssl_verify', 'check_external_repos', 'ignore_other_volumes')
|
|
legacy_opts = ('delete_batch_size',
|
|
'debuginfo_tags',
|
|
'ignore_tags',
|
|
'max_repo_tasks',
|
|
'max_repo_tasks_maven',
|
|
'no_repo_effective_age',
|
|
'queue_file',
|
|
'recent_tasks_lifetime',
|
|
'repo_tasks_limit',
|
|
'source_tags',
|
|
'separate_source_tags',
|
|
'with_src')
|
|
for name in config.options(section):
|
|
if name in int_opts:
|
|
defaults[name] = config.getint(section, name)
|
|
elif name in str_opts:
|
|
defaults[name] = config.get(section, name)
|
|
elif name in bool_opts:
|
|
defaults[name] = config.getboolean(section, name)
|
|
elif name in legacy_opts:
|
|
deprecated('The %s configuration option is no longer used\n' % name)
|
|
else:
|
|
quit("unknown config option: %s" % name)
|
|
for name, value in defaults.items():
|
|
if getattr(options, name, None) is None:
|
|
setattr(options, name, value)
|
|
if options.deleted_repo_lifetime is not None:
|
|
deprecated('The deleted_repo_lifetime option is deprecated. Use expired_repo_lifetime.\n')
|
|
if options.expired_repo_lifetime is not None:
|
|
sys.stderr.write('Ignoring deleted_repo_lifetime because expired_repo_lifetime was '
|
|
'specified\n')
|
|
else:
|
|
options.expired_repo_lifetime = options.deleted_repo_lifetime
|
|
elif options.expired_repo_lifetime is None:
|
|
options.expired_repo_lifetime = 7 * 24 * 3600
|
|
if options.reference_recheck_period is None:
|
|
options.reference_recheck_period = options.recheck_period
|
|
if options.logfile in ('', 'None', 'none'):
|
|
options.logfile = None
|
|
# special handling for cert defaults
|
|
cert_defaults = {
|
|
'cert': '/etc/kojira/client.crt',
|
|
'serverca': '/etc/kojira/serverca.crt',
|
|
}
|
|
for name in cert_defaults:
|
|
if getattr(options, name, None) is None:
|
|
fn = cert_defaults[name]
|
|
if os.path.exists(fn):
|
|
setattr(options, name, fn)
|
|
return options
|
|
|
|
|
|
def quit(msg=None, code=1):
|
|
if msg:
|
|
logging.getLogger("koji.repo").error(msg)
|
|
sys.stderr.write('%s\n' % msg)
|
|
sys.stderr.flush()
|
|
sys.exit(code)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
options = get_options()
|
|
topdir = getattr(options, 'topdir', None)
|
|
pathinfo = koji.PathInfo(topdir)
|
|
if options.show_config:
|
|
pprint.pprint(options.__dict__)
|
|
sys.exit()
|
|
if options.logfile:
|
|
if not os.path.exists(options.logfile):
|
|
try:
|
|
logfile = open(options.logfile, "w")
|
|
logfile.close()
|
|
except Exception:
|
|
sys.stderr.write("Cannot create logfile: %s\n" % options.logfile)
|
|
sys.exit(1)
|
|
if not os.access(options.logfile, os.W_OK):
|
|
sys.stderr.write("Cannot write to logfile: %s\n" % options.logfile)
|
|
sys.exit(1)
|
|
koji.add_file_logger("koji", options.logfile)
|
|
# note we're setting logging for koji.*
|
|
logger = logging.getLogger("koji")
|
|
if options.debug:
|
|
logger.setLevel(logging.DEBUG)
|
|
elif options.verbose:
|
|
logger.setLevel(logging.INFO)
|
|
elif options.quiet:
|
|
logger.setLevel(logging.ERROR)
|
|
else:
|
|
logger.setLevel(logging.WARNING)
|
|
|
|
session_opts = koji.grab_session_options(options)
|
|
session = koji.ClientSession(options.server, session_opts)
|
|
try:
|
|
if options.cert is not None and os.path.isfile(options.cert):
|
|
# authenticate using SSL client certificates
|
|
session.ssl_login(options.cert, None, options.serverca)
|
|
elif options.user:
|
|
# authenticate using user/password
|
|
session.login()
|
|
elif koji.reqgssapi and options.principal and options.keytab:
|
|
session.gssapi_login(options.principal, options.keytab, options.ccache)
|
|
else:
|
|
quit("No username/password/certificate supplied and Kerberos missing or "
|
|
"not configured")
|
|
except koji.AuthError as ex:
|
|
quit(str(ex))
|
|
# get an exclusive session
|
|
try:
|
|
session.exclusiveSession(force=options.force_lock)
|
|
except koji.AuthLockError:
|
|
quit("Error: Unable to get lock. Trying using --force-lock")
|
|
if not session.logged_in:
|
|
quit("Error: Unknown login error")
|
|
if not session.logged_in:
|
|
print("Error: unable to log in")
|
|
sys.exit(1)
|
|
if options.skip_main:
|
|
sys.exit()
|
|
elif options.daemon:
|
|
koji.daemonize()
|
|
else:
|
|
koji.add_stderr_logger("koji")
|
|
main(options, session)
|