PR#2340: kojira: threaded repo deletion
Merges #2340 https://pagure.io/koji/pull-request/2340 Fixes: #2336 https://pagure.io/koji/issue/2336 kojira: thread for repo deletion
This commit is contained in:
commit
ee93da4177
1 changed files with 75 additions and 32 deletions
107
util/kojira
107
util/kojira
|
|
@ -64,7 +64,7 @@ def getTag(session, tag, event=None):
|
|||
|
||||
class ManagedRepo(object):
|
||||
|
||||
def __init__(self, manager, data):
|
||||
def __init__(self, manager, data, repodata=None):
|
||||
self.manager = manager
|
||||
self.session = manager.session
|
||||
self.options = manager.options
|
||||
|
|
@ -81,7 +81,7 @@ class ManagedRepo(object):
|
|||
self.expire_ts = None
|
||||
if koji.REPO_STATES[self.state] in ['EXPIRED', 'DELETED', 'PROBLEM']:
|
||||
self.current = False
|
||||
self.expire_ts = time.time()
|
||||
self._find_expire_time(repodata)
|
||||
# TODO use hub data to find the actual expiration time
|
||||
self.first_seen = time.time()
|
||||
if self.current:
|
||||
|
|
@ -92,6 +92,17 @@ class ManagedRepo(object):
|
|||
tags[x['parent_id']] = 1
|
||||
self.taglist = to_list(tags.keys())
|
||||
|
||||
def _find_expire_time(self, repodata):
|
||||
# find all newer repos for same tag and set oldest as expire_ts for our repo
|
||||
if repodata:
|
||||
repos = [r for r in repodata if
|
||||
r['tag_id'] == self.tag_id and r['create_event'] > self.event_id]
|
||||
if repos:
|
||||
invalidated_by = sorted(repos, key=lambda x: x['create_event'])
|
||||
self.expire_ts = invalidated_by[0]['create_ts']
|
||||
if not self.expire_ts:
|
||||
self.expire_ts = time.time()
|
||||
|
||||
@property
|
||||
def dist(self):
|
||||
# TODO: remove this indirection once we can rely on the hub to return
|
||||
|
|
@ -180,7 +191,7 @@ class ManagedRepo(object):
|
|||
age = time.time() - max(times)
|
||||
return age > timeout
|
||||
|
||||
def tryDelete(self):
|
||||
def tryDelete(self, logger):
|
||||
"""Remove the repo from disk, if possible"""
|
||||
path = self.get_path()
|
||||
if not path:
|
||||
|
|
@ -199,52 +210,55 @@ class ManagedRepo(object):
|
|||
if e.errno == 2:
|
||||
# No such file or directory, so the repo either never existed,
|
||||
# or has already been deleted, so allow it to be marked deleted.
|
||||
self.logger.info("Repo directory does not exist: %s" % path)
|
||||
logger.info("Repo directory does not exist: %s" % path)
|
||||
pass
|
||||
else:
|
||||
self.logger.error("Can't stat repo directory: %s, %s" % (path, e.strerror))
|
||||
logger.error("Can't stat repo directory: %s, %s" % (path, e.strerror))
|
||||
return False
|
||||
else:
|
||||
times = [self.event_ts, mtime, self.expire_ts]
|
||||
times = [ts for ts in times if ts is not None]
|
||||
age = time.time() - max(times)
|
||||
self.logger.debug("Repo %s (%s) age: %i sec", self.repo_id, path, age)
|
||||
logger.debug("Repo %s (%s) age: %i sec", self.repo_id, path, age)
|
||||
if age < lifetime:
|
||||
return False
|
||||
self.logger.debug("Attempting to delete repo %s.." % self.repo_id)
|
||||
logger.debug("Attempting to delete repo %s.." % self.repo_id)
|
||||
if self.state != koji.REPO_EXPIRED:
|
||||
raise koji.GenericError("Repo not expired")
|
||||
if self.session.repoDelete(self.repo_id) > 0:
|
||||
# cannot delete, we are referenced by a buildroot
|
||||
self.logger.debug("Cannot delete repo %s, still referenced" % self.repo_id)
|
||||
logger.debug("Cannot delete repo %s, still referenced" % self.repo_id)
|
||||
return False
|
||||
self.logger.info("Deleted repo %s" % self.repo_id)
|
||||
logger.info("Deleted repo %s" % self.repo_id)
|
||||
self.state = koji.REPO_DELETED
|
||||
if os.path.islink(path):
|
||||
# expected for repos on other volumes
|
||||
info = self.get_info()
|
||||
if not os.path.exists(path):
|
||||
self.logger.error('Repo volume link broken: %s', path)
|
||||
logger.error('Repo volume link broken: %s', path)
|
||||
return False
|
||||
if not info or 'volume' not in info:
|
||||
self.logger.error('Missing repo.json in %s', path)
|
||||
logger.error('Missing repo.json in %s', path)
|
||||
return False
|
||||
realpath = self.get_path(volume=info['volume'])
|
||||
if not os.path.exists(realpath):
|
||||
self.logger.error('Repo real path missing: %s', realpath)
|
||||
logger.error('Repo real path missing: %s', realpath)
|
||||
return False
|
||||
if not os.path.samefile(path, realpath):
|
||||
self.logger.error('Incorrect volume link: %s', path)
|
||||
logger.error('Incorrect volume link: %s', path)
|
||||
return False
|
||||
# ok, try to remove the symlink
|
||||
try:
|
||||
os.unlink(path)
|
||||
except OSError:
|
||||
self.logger.error('Unable to remove volume link: %s', path)
|
||||
# and remove the real path
|
||||
self.manager.rmtree(realpath)
|
||||
logger.error('Unable to remove volume link: %s', path)
|
||||
else:
|
||||
self.manager.rmtree(path)
|
||||
realpath = path
|
||||
try:
|
||||
rmtree(realpath)
|
||||
except BaseException:
|
||||
logger.error(''.join(traceback.format_exception(*sys.exc_info())))
|
||||
|
||||
return True
|
||||
|
||||
def ready(self):
|
||||
|
|
@ -371,7 +385,7 @@ class RepoManager(object):
|
|||
else:
|
||||
self.logger.info('Found repo %s, state=%s'
|
||||
% (repo_id, koji.REPO_STATES[data['state']]))
|
||||
repo = ManagedRepo(self, data)
|
||||
repo = ManagedRepo(self, data, repodata)
|
||||
self.repos[repo_id] = repo
|
||||
if not getTag(self.session, repo.tag_id) and not repo.expired():
|
||||
self.logger.info('Tag %d for repo %d disappeared, expiring.', repo.tag_id, repo_id)
|
||||
|
|
@ -519,6 +533,21 @@ class RepoManager(object):
|
|||
finally:
|
||||
session.logout()
|
||||
|
||||
def deleteLoop(self, session):
|
||||
"""Triggers regens as needed/possible. Runs in a separate thread"""
|
||||
self.session = session
|
||||
self.delete_logger = logging.getLogger("koji.repo.delete")
|
||||
self.delete_logger.info('deleteLoop starting')
|
||||
try:
|
||||
while True:
|
||||
self.deleteRepos()
|
||||
time.sleep(self.options.sleeptime)
|
||||
except Exception:
|
||||
self.delete_logger.exception('Error in delete thread')
|
||||
raise
|
||||
finally:
|
||||
session.logout()
|
||||
|
||||
def pruneLocalRepos(self):
|
||||
for volinfo in self.session.listVolumes():
|
||||
volumedir = pathinfo.volumedir(volinfo['name'])
|
||||
|
|
@ -689,17 +718,6 @@ class RepoManager(object):
|
|||
time_expired = time.time() - tag['expire_ts']
|
||||
f.write(fmt % (tag['taginfo']['name'], int(time_expired), int(tag['score'])))
|
||||
|
||||
# trigger deletes
|
||||
n_deletes = 0
|
||||
for repo in to_list(self.repos.values()):
|
||||
if n_deletes >= self.options.delete_batch_size:
|
||||
break
|
||||
if repo.expired():
|
||||
# try to delete
|
||||
if repo.tryDelete():
|
||||
n_deletes += 1
|
||||
del self.repos[repo.repo_id]
|
||||
|
||||
def checkTasks(self):
|
||||
"""Check on newRepo tasks
|
||||
|
||||
|
|
@ -850,12 +868,12 @@ class RepoManager(object):
|
|||
order = sorted(self.needed_tags.values(), key=lambda t: t['score'], reverse=True)
|
||||
for tag in order:
|
||||
if running_tasks >= self.options.max_repo_tasks:
|
||||
self.logger.debug("Running tasks (%s): %s" % list(running_tasks))
|
||||
self.logger.debug("Running tasks (%s): %s" % (running_tasks, list(self.tasks)))
|
||||
self.logger.info("Maximum number of repo tasks reached")
|
||||
return
|
||||
elif len(self.tasks) + len(self.other_tasks) >= self.options.repo_tasks_limit:
|
||||
self.logger.debug("Tracked tasks (%s): %s" % list(self.tasks))
|
||||
self.logger.debug("Untracked tasks (%s): %s" % list(self.other_tasks))
|
||||
self.logger.debug("Tracked tasks (%s): %s" % (len(self.tasks), list(self.tasks)))
|
||||
self.logger.debug("Untracked tasks (%s): %s" % (len(self.other_tasks), list(self.other_tasks)))
|
||||
self.logger.info("Repo task limit reached")
|
||||
return
|
||||
tagname = tag['taginfo']['name']
|
||||
|
|
@ -905,6 +923,18 @@ class RepoManager(object):
|
|||
if running_tasks_maven >= self.options.max_repo_tasks_maven:
|
||||
self.logger.info("Maximum number of maven repo tasks reached")
|
||||
|
||||
def deleteRepos(self):
|
||||
# trigger deletes
|
||||
self.delete_logger.debug("Starting delete repos")
|
||||
n = 0
|
||||
for repo in to_list(self.repos.values()):
|
||||
if repo.expired():
|
||||
# try to delete
|
||||
if repo.tryDelete(self.delete_logger):
|
||||
del self.repos[repo.repo_id]
|
||||
n += 1
|
||||
self.delete_logger.debug("Ending delete repos (deleted: %s)" % n)
|
||||
|
||||
|
||||
def start_currency_checker(session, repomgr):
|
||||
subsession = session.subsession()
|
||||
|
|
@ -933,6 +963,15 @@ def start_regen_loop(session, repomgr):
|
|||
return thread
|
||||
|
||||
|
||||
def start_delete_loop(session, repomgr):
|
||||
subsession = session.subsession()
|
||||
thread = threading.Thread(name='deleteLoop',
|
||||
target=repomgr.deleteLoop, args=(subsession,))
|
||||
thread.setDaemon(True)
|
||||
thread.start()
|
||||
return thread
|
||||
|
||||
|
||||
def main(options, session):
|
||||
repomgr = RepoManager(options, session)
|
||||
repomgr.readCurrentRepos()
|
||||
|
|
@ -944,6 +983,7 @@ def main(options, session):
|
|||
if options.check_external_repos:
|
||||
curr_ext_chk_thread = start_external_currency_checker(session, repomgr)
|
||||
regen_thread = start_regen_loop(session, repomgr)
|
||||
delete_thread = start_delete_loop(session, repomgr)
|
||||
# TODO also move rmtree jobs to threads
|
||||
logger.info("Entering main loop")
|
||||
while True:
|
||||
|
|
@ -961,6 +1001,9 @@ def main(options, session):
|
|||
if not regen_thread.is_alive():
|
||||
logger.error("Regeneration thread died. Restarting it.")
|
||||
regen_thread = start_regen_loop(session, repomgr)
|
||||
if not delete_thread.is_alive():
|
||||
logger.error("Delete thread died. Restarting it.")
|
||||
delete_thread = start_delete_loop(session, repomgr)
|
||||
except KeyboardInterrupt:
|
||||
logger.warning("User exit")
|
||||
break
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue