kojira: parallelize rmtree

Fixes: https://pagure.io/koji/issue/2398
This commit is contained in:
Tomas Kopecek 2020-08-18 14:19:27 +02:00
parent fae5bcc833
commit 2e042db3af

View file

@ -255,7 +255,7 @@ class ManagedRepo(object):
else: else:
realpath = path realpath = path
try: try:
rmtree(realpath) self.manager.rmtree(realpath)
except BaseException: except BaseException:
logger.error(''.join(traceback.format_exception(*sys.exc_info()))) logger.error(''.join(traceback.format_exception(*sys.exc_info())))
@ -309,7 +309,6 @@ class RepoManager(object):
"""Spawn (or queue) and rmtree job""" """Spawn (or queue) and rmtree job"""
self.logger.info("Queuing rmtree job for %s", path) self.logger.info("Queuing rmtree job for %s", path)
self.delete_queue.append(path) self.delete_queue.append(path)
self.checkQueue()
def checkQueue(self): def checkQueue(self):
finished = [pid for pid in self.delete_pids if self.waitPid(pid)] finished = [pid for pid in self.delete_pids if self.waitPid(pid)]
@ -548,6 +547,20 @@ class RepoManager(object):
finally: finally:
session.logout() session.logout()
def rmtreeLoop(self, session):
self.session = session
logger = logging.getLogger("koji.repo.rmtree")
try:
while True:
logger.debug('queue length: %d', len(self.delete_queue))
self.checkQueue()
time.sleep(self.options.sleeptime)
except Exception:
logger.exception('Error in delete thread')
raise
finally:
session.logout()
def pruneLocalRepos(self): def pruneLocalRepos(self):
for volinfo in self.session.listVolumes(): for volinfo in self.session.listVolumes():
volumedir = pathinfo.volumedir(volinfo['name']) volumedir = pathinfo.volumedir(volinfo['name'])
@ -934,7 +947,7 @@ class RepoManager(object):
if repo.tryDelete(self.delete_logger): if repo.tryDelete(self.delete_logger):
del self.repos[repo.repo_id] del self.repos[repo.repo_id]
n += 1 n += 1
self.delete_logger.debug("Ending delete repos (deleted: %s)" % n) self.delete_logger.debug("Ending delete repos (queued for deletion: %s)" % n)
def start_currency_checker(session, repomgr): def start_currency_checker(session, repomgr):
@ -973,6 +986,15 @@ def start_delete_loop(session, repomgr):
return thread return thread
def start_rmtree_loop(session, repomgr):
subsession = session.subsession()
thread = threading.Thread(name='rmtreeLoop',
target=repomgr.rmtreeLoop, args=(subsession,))
thread.setDaemon(True)
thread.start()
return thread
def main(options, session): def main(options, session):
repomgr = RepoManager(options, session) repomgr = RepoManager(options, session)
repomgr.readCurrentRepos() repomgr.readCurrentRepos()
@ -985,12 +1007,12 @@ def main(options, session):
curr_ext_chk_thread = start_external_currency_checker(session, repomgr) curr_ext_chk_thread = start_external_currency_checker(session, repomgr)
regen_thread = start_regen_loop(session, repomgr) regen_thread = start_regen_loop(session, repomgr)
delete_thread = start_delete_loop(session, repomgr) delete_thread = start_delete_loop(session, repomgr)
rmtree_thread = start_rmtree_loop(session, repomgr)
# TODO also move rmtree jobs to threads # TODO also move rmtree jobs to threads
logger.info("Entering main loop") logger.info("Entering main loop")
while True: while True:
try: try:
repomgr.updateRepos() repomgr.updateRepos()
repomgr.checkQueue()
repomgr.printState() repomgr.printState()
repomgr.pruneLocalRepos() repomgr.pruneLocalRepos()
if not curr_chk_thread.is_alive(): if not curr_chk_thread.is_alive():
@ -1005,6 +1027,9 @@ def main(options, session):
if not delete_thread.is_alive(): if not delete_thread.is_alive():
logger.error("Delete thread died. Restarting it.") logger.error("Delete thread died. Restarting it.")
delete_thread = start_delete_loop(session, repomgr) delete_thread = start_delete_loop(session, repomgr)
if not rmtree_thread.is_alive():
logger.error("rmtree thread died. Restarting it.")
rmtree_thread = start_rmtree_loop(session, repomgr)
except KeyboardInterrupt: except KeyboardInterrupt:
logger.warning("User exit") logger.warning("User exit")
break break