partial refactor of kojira code
This commit is contained in:
parent
04635ac9f2
commit
42dc03b3d5
1 changed files with 163 additions and 0 deletions
163
util/kojira
163
util/kojira
|
|
@ -631,6 +631,169 @@ class RepoManager(object):
|
|||
n_deletes += 1
|
||||
del self.repos[repo.repo_id]
|
||||
|
||||
def checkTasks(self):
|
||||
"""Check on newRepo tasks
|
||||
|
||||
- update taskinfo
|
||||
- remove finished tasks
|
||||
- check for other newRepo tasks (not generated by us)
|
||||
"""
|
||||
|
||||
task_ids = list(self.tasks)
|
||||
self.session.multicall = True
|
||||
for task_id in task_ids:
|
||||
self.session.getTaskInfo(task_id)
|
||||
for task_id, tinfo in zip(task_ids, self.session.multiCall(strict=True)):
|
||||
tstate = koji.TASK_STATES[tinfo['state']]
|
||||
tag_id = self.tasks[task_id]['tag_id']
|
||||
if tstate == 'CLOSED':
|
||||
self.logger.info("Finished: newRepo task %s for tag %s" % (task_id, tag_id))
|
||||
del self.tasks[task_id]
|
||||
elif tstate in ('CANCELED', 'FAILED'):
|
||||
self.logger.info("Problem: newRepo task %s for tag %s is %s" % (task_id, tag_id, tstate))
|
||||
del self.tasks[task_id]
|
||||
else:
|
||||
self.tasks[task_id]['taskinfo'] = tinfo
|
||||
# TODO: implement a timeout
|
||||
|
||||
# also check other newRepo tasks
|
||||
repo_tasks = self.session.listTasks(opts={'method':'newRepo',
|
||||
'state':([koji.TASK_STATES[s] for s in ('FREE', 'OPEN')])})
|
||||
others = [t for t in repo_tasks if t['id'] not in self.tasks]
|
||||
for tinfo in others:
|
||||
if tinfo['id'] not in self.other_tasks:
|
||||
self.logger.info("Untracked newRepo task: %(id)i", tinfo)
|
||||
# note: possible race here, but only a log message
|
||||
# TODO - determine tag and maven support
|
||||
self.other_tasks = dict([(t['id'], t) for t in others])
|
||||
|
||||
def checkNeeded(self):
|
||||
"""Determine which tags currently need regeneration"""
|
||||
|
||||
tags = {}
|
||||
for target in self.session.getBuildTargets():
|
||||
tag_id = target['build_tag']
|
||||
tags[tag_id] = target['build_tag_name']
|
||||
#index repos by tag
|
||||
tag_repos = {}
|
||||
for repo in self.repos.values():
|
||||
tag_repos.setdefault(repo.tag_id, []).append(repo)
|
||||
self.logger.debug("Needed tags: %r" % tags.keys())
|
||||
self.logger.debug("Current tags: %r" % tag_repos.keys())
|
||||
|
||||
for tag_id in tags.iterkeys():
|
||||
covered = False
|
||||
for repo in tag_repos.get(tag_id, []):
|
||||
if repo.current:
|
||||
covered = True
|
||||
break
|
||||
elif repo.pending():
|
||||
#one on the way
|
||||
covered = True
|
||||
break
|
||||
if covered:
|
||||
continue
|
||||
if tag_id in self.needed_tags:
|
||||
# we already know
|
||||
continue
|
||||
|
||||
# we haven't noted this need yet
|
||||
taginfo = self.session.getTag(tag_id)
|
||||
# (not using the caching version since we only call upon discovery)
|
||||
if not taginfo:
|
||||
self.logger.warning('Tag disappeared: %i', tag_id)
|
||||
continue
|
||||
self.logger.info('Tag needs regen: %(name)s', taginfo)
|
||||
|
||||
# how expired are we?
|
||||
ts = 0
|
||||
for repo in tag_repos.get(tag_id, []):
|
||||
if repo.expire_ts:
|
||||
if repo.expire_ts > ts:
|
||||
ts = repo.expire_ts
|
||||
else:
|
||||
self.logger.warning("No expire timestamp for repo: %s", repo.repo_id)
|
||||
if ts == 0:
|
||||
ts = time.time()
|
||||
|
||||
self.needed_tags[tag_id] = {
|
||||
'taginfo': taginfo,
|
||||
'expire_ts': ts,
|
||||
'needed_since' : time.time(),
|
||||
}
|
||||
|
||||
# TODO - prioritize
|
||||
# regen = self.adjustRegenOrder(regen)
|
||||
# self.logger.debug("order: %s", regen)
|
||||
|
||||
def regenRepos(self):
|
||||
"""Trigger newRepo tasks for needed tags"""
|
||||
|
||||
# first note currently running tasks
|
||||
running_tasks = 0
|
||||
running_tasks_maven = 0
|
||||
for task in self.tasks.values():
|
||||
if task['taskinfo']['waiting']:
|
||||
self.logger.debug("Task %(id)i is waiting", task)
|
||||
else:
|
||||
# The largest hub impact is from the first part of the newRepo
|
||||
# task. Once it is waiting on subtasks, that part is over
|
||||
running_tasks += 1
|
||||
if task('maven'):
|
||||
running_tasks_maven += 1
|
||||
|
||||
if running_tasks >= self.options.max_repo_tasks:
|
||||
self.logger.info("Maximum number of repo tasks reached")
|
||||
return
|
||||
elif (len(self.tasks) + len(self.other_tasks)
|
||||
>= self.options.repo_tasks_limit):
|
||||
self.logger.info("Repo task limit reached")
|
||||
return
|
||||
debuginfo_pat = self.options.debuginfo_tags.split()
|
||||
src_pat = self.options.source_tags.split()
|
||||
order = self.needed_tags.values()
|
||||
order.sort(key=lambda t:t['score'])
|
||||
for tag in order:
|
||||
tagname = tag['taginfo']['name']
|
||||
task_id = tag.get('task_id')
|
||||
if task_id:
|
||||
if task_id in self.tasks:
|
||||
# we already have a task
|
||||
continue
|
||||
else:
|
||||
# should not happen
|
||||
logger.warning('Needed tag refers to unknown task. '
|
||||
'%s -> %i', tagname, task_id)
|
||||
taskopts = {}
|
||||
if koji.util.multi_fnmatch(tagname, debuginfo_pat):
|
||||
taskopts['debuginfo'] = True
|
||||
if koji.util.multi_fnmatch(tagname, src_pat):
|
||||
taskopts['src'] = True
|
||||
maven = tag['taginfo']['maven_support']
|
||||
if maven:
|
||||
if running_tasks_maven >= self.options.max_repo_tasks_maven:
|
||||
continue
|
||||
task_id = self.session.newRepo(tagname, **taskopts)
|
||||
running_tasks += 1
|
||||
if maven:
|
||||
running_tasks_maven += 1
|
||||
expire_ts = tag['expire_ts']
|
||||
if expire_ts == 0: # can this still happen?
|
||||
time_expired = '???'
|
||||
else:
|
||||
time_expired = "%.1f" % (time.time() - expire_ts)
|
||||
self.logger.info("Created newRepo task %s for tag %s (%s), expired"
|
||||
"for %s sec", task_id, tag['id'], tag['name'], time_expired)
|
||||
self.tasks[task_id] = {
|
||||
'taskinfo': self.session.getTaskInfo(task_id),
|
||||
'tag_id': tag['taginfo']['id'],
|
||||
'maven': maven,
|
||||
}
|
||||
tag['task_id'] = task_id
|
||||
if running_tasks_maven >= self.options.max_repo_tasks_maven:
|
||||
self.logger.info("Maximum number of maven repo tasks reached")
|
||||
|
||||
|
||||
def start_currency_checker(session, repomgr):
|
||||
subsession = session.subsession()
|
||||
thread = threading.Thread(name='currencyChecker',
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue