kojira: threaded repo deletion

Fixes: https://pagure.io/koji/issue/2336
This commit is contained in:
Tomas Kopecek 2020-06-26 11:39:56 +02:00
parent 63834751c3
commit 74f18b687e

View file

@ -180,7 +180,7 @@ class ManagedRepo(object):
age = time.time() - max(times)
return age > timeout
def tryDelete(self):
def tryDelete(self, logger):
"""Remove the repo from disk, if possible"""
path = self.get_path()
if not path:
@ -199,52 +199,55 @@ class ManagedRepo(object):
if e.errno == 2:
# No such file or directory, so the repo either never existed,
# or has already been deleted, so allow it to be marked deleted.
self.logger.info("Repo directory does not exist: %s" % path)
logger.info("Repo directory does not exist: %s" % path)
pass
else:
self.logger.error("Can't stat repo directory: %s, %s" % (path, e.strerror))
logger.error("Can't stat repo directory: %s, %s" % (path, e.strerror))
return False
else:
times = [self.event_ts, mtime, self.expire_ts]
times = [ts for ts in times if ts is not None]
age = time.time() - max(times)
self.logger.debug("Repo %s (%s) age: %i sec", self.repo_id, path, age)
logger.debug("Repo %s (%s) age: %i sec", self.repo_id, path, age)
if age < lifetime:
return False
self.logger.debug("Attempting to delete repo %s.." % self.repo_id)
logger.debug("Attempting to delete repo %s.." % self.repo_id)
if self.state != koji.REPO_EXPIRED:
raise koji.GenericError("Repo not expired")
if self.session.repoDelete(self.repo_id) > 0:
# cannot delete, we are referenced by a buildroot
self.logger.debug("Cannot delete repo %s, still referenced" % self.repo_id)
logger.debug("Cannot delete repo %s, still referenced" % self.repo_id)
return False
self.logger.info("Deleted repo %s" % self.repo_id)
logger.info("Deleted repo %s" % self.repo_id)
self.state = koji.REPO_DELETED
if os.path.islink(path):
# expected for repos on other volumes
info = self.get_info()
if not os.path.exists(path):
self.logger.error('Repo volume link broken: %s', path)
logger.error('Repo volume link broken: %s', path)
return False
if not info or 'volume' not in info:
self.logger.error('Missing repo.json in %s', path)
logger.error('Missing repo.json in %s', path)
return False
realpath = self.get_path(volume=info['volume'])
if not os.path.exists(realpath):
self.logger.error('Repo real path missing: %s', realpath)
logger.error('Repo real path missing: %s', realpath)
return False
if not os.path.samefile(path, realpath):
self.logger.error('Incorrect volume link: %s', path)
logger.error('Incorrect volume link: %s', path)
return False
# ok, try to remove the symlink
try:
os.unlink(path)
except OSError:
self.logger.error('Unable to remove volume link: %s', path)
# and remove the real path
self.manager.rmtree(realpath)
logger.error('Unable to remove volume link: %s', path)
else:
self.manager.rmtree(path)
realpath = path
try:
rmtree(realpath)
except BaseException:
logger.error(''.join(traceback.format_exception(*sys.exc_info())))
return True
def ready(self):
@ -519,6 +522,21 @@ class RepoManager(object):
finally:
session.logout()
def deleteLoop(self, session):
"""Triggers regens as needed/possible. Runs in a separate thread"""
self.session = session
self.delete_logger = logging.getLogger("koji.repo.delete")
self.delete_logger.info('deleteLoop starting')
try:
while True:
self.deleteRepos()
time.sleep(self.options.sleeptime)
except Exception:
self.delete_logger.exception('Error in delete thread')
raise
finally:
session.logout()
def pruneLocalRepos(self):
for volinfo in self.session.listVolumes():
volumedir = pathinfo.volumedir(volinfo['name'])
@ -689,17 +707,6 @@ class RepoManager(object):
time_expired = time.time() - tag['expire_ts']
f.write(fmt % (tag['taginfo']['name'], int(time_expired), int(tag['score'])))
# trigger deletes
n_deletes = 0
for repo in to_list(self.repos.values()):
if n_deletes >= self.options.delete_batch_size:
break
if repo.expired():
# try to delete
if repo.tryDelete():
n_deletes += 1
del self.repos[repo.repo_id]
def checkTasks(self):
"""Check on newRepo tasks
@ -850,12 +857,12 @@ class RepoManager(object):
order = sorted(self.needed_tags.values(), key=lambda t: t['score'], reverse=True)
for tag in order:
if running_tasks >= self.options.max_repo_tasks:
self.logger.debug("Running tasks (%s): %s" % list(running_tasks))
self.logger.debug("Running tasks (%s): %s" % (running_tasks, list(self.tasks)))
self.logger.info("Maximum number of repo tasks reached")
return
elif len(self.tasks) + len(self.other_tasks) >= self.options.repo_tasks_limit:
self.logger.debug("Tracked tasks (%s): %s" % list(self.tasks))
self.logger.debug("Untracked tasks (%s): %s" % list(self.other_tasks))
self.logger.debug("Tracked tasks (%s): %s" % (len(self.tasks), list(self.tasks)))
self.logger.debug("Untracked tasks (%s): %s" % (len(self.other_tasks), list(self.other_tasks)))
self.logger.info("Repo task limit reached")
return
tagname = tag['taginfo']['name']
@ -905,6 +912,18 @@ class RepoManager(object):
if running_tasks_maven >= self.options.max_repo_tasks_maven:
self.logger.info("Maximum number of maven repo tasks reached")
def deleteRepos(self):
# trigger deletes
self.delete_logger.debug("Starting delete repos")
n = 0
for repo in to_list(self.repos.values()):
if repo.expired():
# try to delete
if repo.tryDelete(self.delete_logger):
del self.repos[repo.repo_id]
n += 1
self.delete_logger.debug("Ending delete repos (deleted: %s)" % n)
def start_currency_checker(session, repomgr):
subsession = session.subsession()
@ -933,6 +952,15 @@ def start_regen_loop(session, repomgr):
return thread
def start_delete_loop(session, repomgr):
subsession = session.subsession()
thread = threading.Thread(name='deleteLoop',
target=repomgr.deleteLoop, args=(subsession,))
thread.setDaemon(True)
thread.start()
return thread
def main(options, session):
repomgr = RepoManager(options, session)
repomgr.readCurrentRepos()
@ -944,6 +972,7 @@ def main(options, session):
if options.check_external_repos:
curr_ext_chk_thread = start_external_currency_checker(session, repomgr)
regen_thread = start_regen_loop(session, repomgr)
delete_thread = start_delete_loop(session, repomgr)
# TODO also move rmtree jobs to threads
logger.info("Entering main loop")
while True:
@ -961,6 +990,9 @@ def main(options, session):
if not regen_thread.is_alive():
logger.error("Regeneration thread died. Restarting it.")
regen_thread = start_regen_loop(session, repomgr)
if not delete_thread.is_alive():
logger.error("Delete thread died. Restarting it.")
delete_thread = start_delete_loop(session, repomgr)
except KeyboardInterrupt:
logger.warning("User exit")
break