PR#516: kojira monitors external repos changes
Merges #516 https://pagure.io/koji/pull-request/516 Fixes: #512 https://pagure.io/koji/issue/512 RFE: kojira: monitor external repos
This commit is contained in:
commit
9639f78ca7
1 changed files with 98 additions and 7 deletions
105
util/kojira
105
util/kojira
|
|
@ -35,7 +35,9 @@ import threading
|
|||
import time
|
||||
import traceback
|
||||
from optparse import OptionParser
|
||||
from xml.etree import ElementTree
|
||||
|
||||
import requests
|
||||
import six
|
||||
|
||||
import koji
|
||||
|
|
@ -250,6 +252,7 @@ class RepoManager(object):
|
|||
self._local = threading.local()
|
||||
self._local.session = session
|
||||
self.repos = {}
|
||||
self.external_repos = {}
|
||||
self.tasks = {}
|
||||
self.recent_tasks = {}
|
||||
self.other_tasks = {}
|
||||
|
|
@ -370,8 +373,37 @@ class RepoManager(object):
|
|||
self.logger.info('Dropping entry for inactive repo: %s', repo_id)
|
||||
del self.repos[repo_id]
|
||||
|
||||
def checkCurrentRepos(self):
|
||||
"""Determine which repos are current"""
|
||||
def checkExternalRepo(self, ts, repodata, tag):
|
||||
"""Determine which external repos are current, return True if remote repo is newer"""
|
||||
url = repodata['url']
|
||||
if url not in self.external_repos:
|
||||
self.external_repos[url] = 0
|
||||
arches = [] # placeholder for repos without $arch bit
|
||||
try:
|
||||
arches = self.session.getTag(tag)['arches'].split()
|
||||
except AttributeError:
|
||||
pass
|
||||
for arch in arches:
|
||||
if '$arch' in url:
|
||||
arch_url = url.replace('$arch', arch)
|
||||
else:
|
||||
arch_url = url
|
||||
arch_url = os.path.join(arch_url, 'repodata/repomd.xml')
|
||||
self.logger.debug('Checking external url: %s' % arch_url)
|
||||
try:
|
||||
r = requests.get(arch_url, timeout=5)
|
||||
root = ElementTree.fromstring(r.text)
|
||||
for child in root.iter('{http://linux.duke.edu/metadata/repo}timestamp'):
|
||||
remote_ts = int(child.text)
|
||||
if remote_ts > self.external_repos[url]:
|
||||
self.external_repos[url] = remote_ts
|
||||
except Exception:
|
||||
# inaccessible or without timestamps
|
||||
# treat repo as unchanged (ts = 0)
|
||||
pass
|
||||
return ts < self.external_repos[url]
|
||||
|
||||
def reposToCheck(self):
|
||||
to_check = []
|
||||
repo_ids = to_list(self.repos.keys())
|
||||
for repo_id in repo_ids:
|
||||
|
|
@ -392,11 +424,40 @@ class RepoManager(object):
|
|||
if self.logger.isEnabledFor(logging.DEBUG):
|
||||
skipped = set(repo_ids).difference([r.repo_id for r in to_check])
|
||||
self.logger.debug("Skipped check for repos: %r", skipped)
|
||||
if not to_check:
|
||||
return
|
||||
for repo in to_check:
|
||||
changed = self.session.tagChangedSinceEvent(repo.event_id, repo.taglist)
|
||||
return to_check
|
||||
|
||||
def checkExternalRepos(self):
|
||||
"""Determine which external repos changed"""
|
||||
# clean external repo cache
|
||||
self.external_repos = {}
|
||||
for repo in self.reposToCheck():
|
||||
changed = False
|
||||
for tag in repo.taglist:
|
||||
try:
|
||||
external_repos = self.session.getExternalRepoList(tag)
|
||||
except koji.GenericError:
|
||||
# in case tag was deleted, checkCurrentRepos is
|
||||
# responsible for cleanup, ignore it here
|
||||
external_repos = []
|
||||
for external_repo in external_repos:
|
||||
changed = self.checkExternalRepo(repo.event_ts, external_repo, tag)
|
||||
self.logger.debug("Check external repo %s [%s] for tag %s: %s" % (
|
||||
external_repo['external_repo_id'], external_repo['url'],
|
||||
tag, changed))
|
||||
if changed:
|
||||
break
|
||||
if changed:
|
||||
break
|
||||
if changed:
|
||||
self.logger.info("Repo %i no longer current due to external repo change" %
|
||||
repo.repo_id)
|
||||
repo.current = False
|
||||
repo.expire_ts = time.time()
|
||||
|
||||
def checkCurrentRepos(self):
|
||||
"""Determine which repos are current"""
|
||||
for repo in self.reposToCheck():
|
||||
if self.session.tagChangedSinceEvent(repo.event_id, repo.taglist):
|
||||
self.logger.info("Repo %i no longer current", repo.repo_id)
|
||||
repo.current = False
|
||||
repo.expire_ts = time.time()
|
||||
|
|
@ -416,6 +477,21 @@ class RepoManager(object):
|
|||
finally:
|
||||
session.logout()
|
||||
|
||||
def currencyExternalChecker(self, session):
|
||||
"""Continually checks repos for external repo currency. Runs as a separate thread"""
|
||||
self.session = session
|
||||
self.logger = logging.getLogger("koji.repo.currency_external")
|
||||
self.logger.info('currencyExternalChecker starting')
|
||||
try:
|
||||
while True:
|
||||
self.checkExternalRepos()
|
||||
time.sleep(self.options.sleeptime)
|
||||
except Exception:
|
||||
self.logger.exception('Error in external currency checker thread')
|
||||
raise
|
||||
finally:
|
||||
session.logout()
|
||||
|
||||
def regenLoop(self, session):
|
||||
"""Triggers regens as needed/possible. Runs in a separate thread"""
|
||||
self.session = session
|
||||
|
|
@ -815,6 +891,15 @@ def start_currency_checker(session, repomgr):
|
|||
return thread
|
||||
|
||||
|
||||
def start_external_currency_checker(session, repomgr):
|
||||
subsession = session.subsession()
|
||||
thread = threading.Thread(name='currencyExternalChecker',
|
||||
target=repomgr.currencyExternalChecker, args=(subsession,))
|
||||
thread.setDaemon(True)
|
||||
thread.start()
|
||||
return thread
|
||||
|
||||
|
||||
def start_regen_loop(session, repomgr):
|
||||
subsession = session.subsession()
|
||||
thread = threading.Thread(name='regenLoop',
|
||||
|
|
@ -832,6 +917,8 @@ def main(options, session):
|
|||
raise SystemExit
|
||||
signal.signal(signal.SIGTERM, shutdown)
|
||||
curr_chk_thread = start_currency_checker(session, repomgr)
|
||||
if options.check_external_repos:
|
||||
curr_ext_chk_thread = start_external_currency_checker(session, repomgr)
|
||||
regen_thread = start_regen_loop(session, repomgr)
|
||||
# TODO also move rmtree jobs to threads
|
||||
logger.info("Entering main loop")
|
||||
|
|
@ -844,6 +931,9 @@ def main(options, session):
|
|||
if not curr_chk_thread.isAlive():
|
||||
logger.error("Currency checker thread died. Restarting it.")
|
||||
curr_chk_thread = start_currency_checker(session, repomgr)
|
||||
if options.check_external_repos and not curr_ext_chk_thread.isAlive():
|
||||
logger.error("External currency checker thread died. Restarting it.")
|
||||
curr_ext_chk_thread = start_external_currency_checker(session, repomgr)
|
||||
if not regen_thread.isAlive():
|
||||
logger.error("Regeneration thread died. Restarting it.")
|
||||
regen_thread = start_regen_loop(session, repomgr)
|
||||
|
|
@ -940,6 +1030,7 @@ def get_options():
|
|||
'deleted_repo_lifetime': 7 * 24 * 3600,
|
||||
# XXX should really be called expired_repo_lifetime
|
||||
'dist_repo_lifetime': 7 * 24 * 3600,
|
||||
'check_external_repos': True,
|
||||
'recent_tasks_lifetime': 600,
|
||||
'sleeptime': 15,
|
||||
'cert': None,
|
||||
|
|
@ -956,7 +1047,7 @@ def get_options():
|
|||
'krbservice', 'cert', 'ca', 'serverca', 'debuginfo_tags',
|
||||
'source_tags', 'separate_source_tags', 'ignore_tags') # FIXME: remove ca here
|
||||
bool_opts = ('verbose', 'debug', 'ignore_stray_repos', 'offline_retry',
|
||||
'krb_rdns', 'krb_canon_host', 'no_ssl_verify')
|
||||
'krb_rdns', 'krb_canon_host', 'no_ssl_verify', 'check_external_repos')
|
||||
legacy_opts = ('with_src')
|
||||
for name in config.options(section):
|
||||
if name in int_opts:
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue