move regens into thread

This commit is contained in:
Mike McLean 2018-02-06 12:02:39 -06:00
parent bda8e7adfa
commit 557dcce1b5

View file

@ -194,7 +194,8 @@ class RepoManager(object):
def __init__(self, options, session):
self.options = options
self.session = session
self._local = threading.local()
self._local.session = session
self.repos = {}
self.tasks = {}
self.other_tasks = {}
@ -204,6 +205,15 @@ class RepoManager(object):
self.delete_queue = []
self.logger = logging.getLogger("koji.repo.manager")
@property
def session(self):
# session is stored in our threadlocal instance
return self._local.session
@session.setter
def session(self, value):
self._local.session = value
def printState(self):
self.logger.debug('Tracking %i repos, %i child processes', len(self.repos), len(self.delete_pids))
for tag_id, task_id in self.tasks.iteritems():
@ -299,10 +309,8 @@ class RepoManager(object):
self.logger.info('Dropping entry for inactive repo: %s', repo_id)
del self.repos[repo_id]
def checkCurrentRepos(self, session=None):
def checkCurrentRepos(self):
"""Determine which repos are current"""
if session is None:
session = self.session
to_check = []
repo_ids = self.repos.keys()
for repo_id in repo_ids:
@ -325,10 +333,8 @@ class RepoManager(object):
self.logger.debug("Skipped check for repos: %r", skipped)
if not to_check:
return
#session.multicall = True
for repo in to_check:
changed = session.tagChangedSinceEvent(repo.event_id, repo.taglist)
#for repo, [changed] in zip(to_check, session.multiCall(strict=True)):
changed = self.session.tagChangedSinceEvent(repo.event_id, repo.taglist)
if changed:
self.logger.info("Repo %i no longer current", repo.repo_id)
repo.current = False
@ -336,10 +342,11 @@ class RepoManager(object):
def currencyChecker(self, session):
"""Continually checks repos for currency. Runs as a separate thread"""
self.session = session
self.logger.info('currencyChecker starting')
try:
while True:
self.checkCurrentRepos(session)
self.checkCurrentRepos()
time.sleep(self.options.sleeptime)
except:
logger.exception('Error in currency checker thread')
@ -347,6 +354,20 @@ class RepoManager(object):
finally:
session.logout()
def regenLoop(self, session):
"""Triggers regens as needed/possible. Runs in a separate thread"""
self.session = session
self.logger.info('regenLoop starting')
try:
while True:
self.regenRepos()
time.sleep(self.options.sleeptime)
except:
logger.exception('Error in regen thread')
raise
finally:
session.logout()
def pruneLocalRepos(self, topdir, timername):
"""Scan filesystem for repos and remove any deleted ones
@ -480,8 +501,6 @@ class RepoManager(object):
# find out which tags require repos
self.checkNeeded()
self.regenRepos()
self.updateTagScores()
# trigger deletes
@ -681,6 +700,16 @@ def start_currency_checker(session, repomgr):
thread.start()
return thread
def start_regen_loop(session, repomgr):
subsession = session.subsession()
thread = threading.Thread(name='regenLoop',
target=repomgr.regenLoop, args=(subsession,))
thread.setDaemon(True)
thread.start()
return thread
def main(options, session):
repomgr = RepoManager(options, session)
repomgr.readCurrentRepos()
@ -688,6 +717,7 @@ def main(options, session):
raise SystemExit
signal.signal(signal.SIGTERM,shutdown)
curr_chk_thread = start_currency_checker(session, repomgr)
regen_thread = start_regen_loop(session, repomgr)
# TODO also move rmtree jobs to threads
logger.info("Entering main loop")
repodir = "%s/repos" % pathinfo.topdir
@ -702,6 +732,9 @@ def main(options, session):
if not curr_chk_thread.isAlive():
logger.error("Currency checker thread died. Restarting it.")
curr_chk_thread = start_currency_checker(session, repomgr)
if not regen_thread.isAlive():
logger.error("Currency checker thread died. Restarting it.")
rege_thread = start_regen_loop(session, repomgr)
except KeyboardInterrupt:
logger.warn("User exit")
break