PR#4277: kojira: split currency and regen

Merges #4277
https://pagure.io/koji/pull-request/4277

Fixes: #4278
https://pagure.io/koji/issue/4278
repo requests can be delayed of there are a lot of tags with auto regen
This commit is contained in:
Tomas Kopecek 2025-02-18 12:09:18 +01:00
commit 4270cdd4c1
5 changed files with 134 additions and 143 deletions

View file

@ -301,6 +301,9 @@ class RepoWatcher(object):
raise koji.GenericError("Unsuccessfully waited %s for a new %s repo" %
(koji.util.duration(self.start), self.taginfo['name']))
if not check['request']['active']:
if check['request']['task_id']:
tstate = koji.TASK_STATES[check['request']['task_state']]
self.logger.error('Task %s state is %s', check['request']['task_id'], tstate)
raise koji.GenericError("Repo request no longer active")
self.pause()

View file

@ -391,7 +391,7 @@ def update_end_events():
n_cached = 0
tag_last = {}
updates = []
for repo in query.execute():
for repo in repos:
tag_id = repo['tag_id']
# use cache to avoid redundant calls
if tag_id in tag_last and tag_last[tag_id] <= repo['create_event']:

View file

@ -0,0 +1,89 @@
from __future__ import absolute_import
import os.path
import shutil
import signal
import tempfile
import time
import unittest
import mock
import pytest
import koji
from . import loadkojira
kojira = loadkojira.kojira
class MyError(Exception):
"""sentinel exception"""
pass
class MainTest(unittest.TestCase):
def setUp(self):
self.start_thread = mock.patch.object(kojira, 'start_thread').start()
self.repomgr = mock.MagicMock()
self.RepoManager = mock.patch.object(kojira, 'RepoManager', return_value=self.repomgr).start()
self.session = mock.MagicMock()
self.options = mock.MagicMock()
self.options.sleeptime = 1
self.workdir = tempfile.mkdtemp()
self.topdir = self.workdir + '/koji'
self.pathinfo = koji.PathInfo(self.topdir)
mock.patch.object(kojira, 'pathinfo', create=True, new=self.pathinfo).start()
self.logger = mock.patch.object(kojira, 'logger', create=True).start()
self.sleep = mock.patch('time.sleep').start()
self.signal = mock.patch('signal.signal').start()
def tearDown(self):
mock.patch.stopall()
shutil.rmtree(self.workdir)
def test_userkill1(self):
self.sleep.side_effect = [None] * 10 + [KeyboardInterrupt()]
with self.assertRaises(SystemExit) as ex:
kojira.main(self.options, self.session)
self.assertEqual(ex.exception.code, 0)
def test_terminal_errors(self):
for cls in KeyboardInterrupt, koji.AuthExpired, koji.AuthError, SystemExit:
err = cls()
self.sleep.side_effect = [None] * 10 + [Exception()]
self.repomgr.updateRepos.side_effect = [None] * 5 + [err]
with self.assertRaises(SystemExit):
kojira.main(self.options, self.session)
self.assertEqual(len(self.repomgr.pruneLocalRepos.mock_calls), 5)
self.repomgr.reset_mock()
def test_nonterminal_error(self):
err = MyError()
self.sleep.side_effect = [None] * 10 + [KeyboardInterrupt()]
self.repomgr.updateRepos.side_effect = [None] * 5 + [err] * 6
with self.assertRaises(SystemExit) as ex:
kojira.main(self.options, self.session)
self.assertEqual(ex.exception.code, 0)
self.assertEqual(len(self.repomgr.updateRepos.mock_calls), 11)
self.assertEqual(len(self.repomgr.pruneLocalRepos.mock_calls), 5)
self.repomgr.reset_mock()
def test_shutdown_handler(self):
self.signal.side_effect = [Exception('stop here')]
with self.assertRaises(Exception):
kojira.main(self.options, self.session)
# grab the handler
self.assertEqual(self.signal.mock_calls[0][1][0], signal.SIGTERM)
handler = self.signal.mock_calls[0][1][1]
self.signal.side_effect = None
# make sure the handler does what it should
with self.assertRaises(SystemExit):
handler()
# the end

View file

@ -42,54 +42,16 @@ class RepoManagerTest(unittest.TestCase):
return pid, 0
@mock.patch('time.sleep')
def test_regen_loop(self, sleep):
def test_thread_loop(self, sleep):
subsession = mock.MagicMock()
self.mgr.regenRepos = mock.MagicMock()
self.mgr.regenRepos.side_effect = [None] * 10 + [OurException()]
self.mgr.do_regen = mock.MagicMock()
self.mgr.do_regen.side_effect = [None] * 10 + [OurException()]
# we need the exception to terminate the infinite loop
with self.assertRaises(OurException):
self.mgr.regenLoop(subsession)
self.mgr.threadLoop(subsession, 'regen')
self.assertEqual(self.mgr.regenRepos.call_count, 11)
subsession.logout.assert_called_once()
@mock.patch('time.sleep')
def test_rmtree_loop(self, sleep):
subsession = mock.MagicMock()
self.mgr.checkQueue = mock.MagicMock()
self.mgr.checkQueue.side_effect = [None] * 10 + [OurException()]
# we need the exception to terminate the infinite loop
with self.assertRaises(OurException):
self.mgr.rmtreeLoop(subsession)
self.assertEqual(self.mgr.checkQueue.call_count, 11)
subsession.logout.assert_called_once()
@mock.patch('time.sleep')
def test_currency_loop(self, sleep):
subsession = mock.MagicMock()
subsession.repo.updateEndEvents.side_effect = [None] * 10 + [OurException()]
# we need the exception to terminate the infinite loop
with self.assertRaises(OurException):
self.mgr.currencyChecker(subsession)
self.assertEqual(subsession.repo.updateEndEvents.call_count, 11)
subsession.logout.assert_called_once()
@mock.patch('time.sleep')
def test_external_loop(self, sleep):
subsession = mock.MagicMock()
self.mgr.checkExternalRepos = mock.MagicMock()
self.mgr.checkExternalRepos.side_effect = [None] * 10 + [OurException()]
# we need the exception to terminate the infinite loop
with self.assertRaises(OurException):
self.mgr.currencyExternalChecker(subsession)
self.assertEqual(self.mgr.checkExternalRepos.call_count, 11)
self.assertEqual(self.mgr.do_regen.call_count, 11)
subsession.logout.assert_called_once()
def test_rmtree(self):

View file

@ -443,65 +443,43 @@ class RepoManager(object):
data['id'] = erepo_id
self.checkExternalRepo(data, arches, ts_cache)
def currencyChecker(self, session):
"""Continually checks repos for currency. Runs as a separate thread"""
def threadLoop(self, session, name):
"""Wrapper for running thread handlers in a loop"""
# we should be passed a subsession of main
self.session = session
self.logger = logging.getLogger("koji.repo.currency")
self.logger.info('currencyChecker starting')
handler = getattr(self, f'do_{name}')
self.logger = logging.getLogger(f'koji.repo.{name}')
self.logger.info(f'{name} thread starting')
try:
while True:
self.session.repo.updateEndEvents()
# TODO does this still need to be its own thread?
handler()
time.sleep(self.options.sleeptime)
except Exception:
self.logger.exception('Error in currency checker thread')
self.logger.exception(f'Error in {name} thread')
raise
finally:
session.logout()
def currencyExternalChecker(self, session):
"""Continually checks repos for external repo currency. Runs as a separate thread"""
self.session = session
self.logger = logging.getLogger("koji.repo.currency_external")
self.logger.info('currencyExternalChecker starting')
try:
while True:
self.checkExternalRepos()
time.sleep(self.options.sleeptime)
except Exception:
self.logger.exception('Error in external currency checker thread')
raise
finally:
session.logout()
def do_currency(self):
"""Checks repos for currency"""
# this call can take a while
self.session.repo.updateEndEvents()
def regenLoop(self, session):
"""Triggers regens as needed/possible. Runs in a separate thread"""
self.session = session
self.logger = logging.getLogger("koji.repo.regen")
self.logger.info('regenLoop starting')
try:
while True:
self.regenRepos()
time.sleep(self.options.sleeptime)
except Exception:
self.logger.exception('Error in regen thread')
raise
finally:
session.logout()
def do_check_external(self):
"""Check external repos"""
self.checkExternalRepos()
def rmtreeLoop(self, session):
self.session = session
logger = logging.getLogger("koji.repo.rmtree")
try:
while True:
logger.debug('queue length: %d', len(self.delete_queue))
self.checkQueue()
time.sleep(self.options.sleeptime)
except Exception:
logger.exception('Error in rmtree thread')
raise
finally:
session.logout()
def do_regen(self):
"""Triggers regens as needed/possible"""
self.session.repo.checkQueue()
def do_autoregen(self):
"""Triggers automatic regens as needed/possible"""
self.session.repo.autoRequests()
def do_rmtree(self):
logger.debug('queue length: %d', len(self.delete_queue))
self.checkQueue()
def pruneLocalRepos(self):
# non-dist repos are always on the default volume
@ -649,43 +627,11 @@ class RepoManager(object):
elif repo.state == koji.REPO_PROBLEM:
repo.handle_problem()
def regenRepos(self):
"""Trigger repo requests as needed"""
self.session.repo.autoRequests()
self.session.repo.checkQueue()
def start_currency_checker(session, repomgr):
def start_thread(session, repomgr, name):
handler = getattr(repomgr, 'threadLoop')
subsession = session.subsession()
thread = threading.Thread(name='currencyChecker',
target=repomgr.currencyChecker, args=(subsession,))
thread.daemon = True
thread.start()
return thread
def start_external_currency_checker(session, repomgr):
subsession = session.subsession()
thread = threading.Thread(name='currencyExternalChecker',
target=repomgr.currencyExternalChecker, args=(subsession,))
thread.daemon = True
thread.start()
return thread
def start_regen_loop(session, repomgr):
subsession = session.subsession()
thread = threading.Thread(name='regenLoop',
target=repomgr.regenLoop, args=(subsession,))
thread.daemon = True
thread.start()
return thread
def start_rmtree_loop(session, repomgr):
subsession = session.subsession()
thread = threading.Thread(name='rmtreeLoop',
target=repomgr.rmtreeLoop, args=(subsession,))
thread = threading.Thread(name=name, target=handler, args=(subsession, name))
thread.daemon = True
thread.start()
return thread
@ -698,11 +644,10 @@ def main(options, session):
def shutdown(*args):
raise SystemExit
signal.signal(signal.SIGTERM, shutdown)
curr_chk_thread = start_currency_checker(session, repomgr)
tnames = ['currency', 'regen', 'autoregen', 'rmtree']
if options.check_external_repos:
curr_ext_chk_thread = start_external_currency_checker(session, repomgr)
regen_thread = start_regen_loop(session, repomgr)
rmtree_thread = start_rmtree_loop(session, repomgr)
tnames.append('check_external')
threads = {name: start_thread(session, repomgr, name) for name in tnames}
logger.info("Entering main loop")
exit_code = 0
while True:
@ -710,18 +655,10 @@ def main(options, session):
repomgr.updateRepos()
repomgr.printState()
repomgr.pruneLocalRepos()
if not curr_chk_thread.is_alive():
logger.error("Currency checker thread died. Restarting it.")
curr_chk_thread = start_currency_checker(session, repomgr)
if options.check_external_repos and not curr_ext_chk_thread.is_alive():
logger.error("External currency checker thread died. Restarting it.")
curr_ext_chk_thread = start_external_currency_checker(session, repomgr)
if not regen_thread.is_alive():
logger.error("Regeneration thread died. Restarting it.")
regen_thread = start_regen_loop(session, repomgr)
if not rmtree_thread.is_alive():
logger.error("rmtree thread died. Restarting it.")
rmtree_thread = start_rmtree_loop(session, repomgr)
for name in tnames:
if not threads[name].is_alive():
logger.error(f'{name} thread died. Restarting it.')
threads[name] = start_thread(session, repomgr, name)
except KeyboardInterrupt:
logger.warning("User exit")
break