PR#797: Move kojira's regen loop into dedicated thread

Merges #797
https://pagure.io/koji/pull-request/797

Fixes: #798
https://pagure.io/koji/issue/798
kojira too slow on some systems
This commit is contained in:
Mike McLean 2018-02-13 13:48:49 -05:00
commit 0087888fd4
4 changed files with 435 additions and 176 deletions

View file

View file

@ -0,0 +1,8 @@
from __future__ import absolute_import
import os
# TODO - libify kojira so we don't need this hack
CLI_FILENAME = os.path.dirname(__file__) + "/../../util/kojira"
import imp
kojira = imp.load_source('kojira', CLI_FILENAME)

View file

@ -0,0 +1,168 @@
from __future__ import absolute_import
import mock
import time
import unittest
import koji
from . import loadkojira
kojira = loadkojira.kojira
class OurException(Exception):
pass
class RepoManagerTest(unittest.TestCase):
def setUp(self):
self.session = mock.MagicMock()
self.options = mock.MagicMock()
self.mgr = kojira.RepoManager(self.options, self.session)
def tearDown(self):
mock.patch.stopall()
def test_check_tasks_none(self):
self.mgr.tasks = {}
self.mgr.other_tasks = {}
self.session.listTasks.return_value = []
self.mgr.checkTasks()
self.session.getTaskInfo.assert_not_called()
self.assertEqual(self.mgr.tasks, {})
self.assertEqual(self.mgr.other_tasks, {})
def test_check_tasks_other(self):
self.mgr.tasks = {}
self.mgr.other_tasks = {}
self.session.listTasks.return_value = [
{
'id': 1294,
'method': 'newRepo',
'state': 0,
'waiting': None,
},
]
self.mgr.logger = mock.MagicMock()
self.mgr.checkTasks()
self.session.getTaskInfo.assert_not_called()
self.assertEqual(self.mgr.tasks, {})
self.assertEqual(len(self.mgr.other_tasks), 1)
self.mgr.logger.info.assert_called_once()
# the extra task should not be logged if we run again
self.mgr.logger.reset_mock()
self.mgr.checkTasks()
self.mgr.logger.info.assert_not_called()
def test_check_tasks_ours(self):
tasks = [
{'id': 101, 'state': koji.TASK_STATES['FREE']},
{'id': 102, 'state': koji.TASK_STATES['OPEN']},
{'id': 103, 'state': koji.TASK_STATES['CLOSED']},
{'id': 104, 'state': koji.TASK_STATES['CANCELED']},
{'id': 105, 'state': koji.TASK_STATES['FAILED']},
]
task_idx = dict([(t['id'], t) for t in tasks])
order = []
def getTaskInfo(task_id):
# record the order of calls in multicall
order.append(task_id)
def multiCall(strict):
return [[task_idx[tid]] for tid in order]
self.session.getTaskInfo.side_effect = getTaskInfo
self.session.multiCall.side_effect = multiCall
self.mgr.tasks = dict([
(t['id'], {'taskinfo': t, 'tag_id': 'TAG'})
for t in tasks])
self.mgr.other_tasks = {}
self.session.listTasks.return_value = []
self.mgr.checkTasks()
# should have removed the close tasks
self.assertEqual(self.mgr.tasks.keys(), [101, 102])
@mock.patch('time.sleep')
def test_regen_loop(self, sleep):
subsession = mock.MagicMock()
self.mgr.regenRepos = mock.MagicMock()
self.mgr.regenRepos.side_effect = [None] * 10 + [OurException()]
# we need the exception to terminate the infinite loop
with self.assertRaises(OurException):
self.mgr.regenLoop(subsession)
self.assertEqual(self.mgr.regenRepos.call_count, 11)
subsession.logout.assert_called_once()
def test_set_tag_score(self):
self.mgr.tagUseStats = mock.MagicMock()
self.mgr.tagUseStats.return_value = {
'n_recent': 5
}
self.mgr.needed_tags = {}
entry = {
'taginfo': {
'id': 'TAGID',
'name': 'TAGNAME',
},
'expire_ts': time.time() - 300
}
self.mgr.setTagScore(entry)
score = entry['score']
if score < 0.0:
raise Exception('score too low')
_entry = entry.copy()
_entry['expire_ts'] -= 300
self.mgr.setTagScore(_entry)
if score > entry['score']:
raise Exception('score should have increased')
self.mgr.tagUseStats.return_value = {
'n_recent': 10
# higher than before
}
self.mgr.setTagScore(entry)
if score > entry['score']:
raise Exception('score should have increased')
def test_check_needed(self):
self.session.getBuildTargets.return_value = [
{'build_tag': 1},
{'build_tag': 2},
{'build_tag': 3},
]
# make two repo entries
repo1 = mock.MagicMock()
repo1.tag_id = 1
repo1.current = True
repo2 = mock.MagicMock()
repo2.tag_id = 2
repo2.current = False
repo2.pending.return_value = True
self.mgr.repos = {1: repo1, 2: repo2}
# more mocks
def my_get_tag(tag_id):
return {'id': tag_id, 'name': 'TAG %i' % tag_id}
self.session.getTag.side_effect = my_get_tag
self.mgr.logger = mock.MagicMock()
self.mgr.setTagScore = mock.MagicMock()
with mock.patch('time.time') as mytime:
mytime.side_effect = [1000, 1100]
self.mgr.checkNeeded()
# only the third tag should show up as needed
expected = {3:
{'expire_ts': 1000,
'needed_since': 1100,
'taginfo': {
'id': 3,
'name': 'TAG 3'
}}}
self.assertEqual(self.mgr.needed_tags, expected)

View file

@ -27,7 +27,6 @@ from koji.util import rmtree, parseStatus
from optparse import OptionParser
from ConfigParser import ConfigParser
import errno
import fnmatch
import logging
import logging.handlers
import pprint
@ -195,14 +194,26 @@ class RepoManager(object):
def __init__(self, options, session):
self.options = options
self.session = session
self._local = threading.local()
self._local.session = session
self.repos = {}
self.tasks = {}
self.other_tasks = {}
self.needed_tags = {}
self.tag_use_stats = {}
self.delete_pids = {}
self.delete_queue = []
self.logger = logging.getLogger("koji.repo.manager")
@property
def session(self):
# session is stored in our threadlocal instance
return self._local.session
@session.setter
def session(self, value):
self._local.session = value
def printState(self):
self.logger.debug('Tracking %i repos, %i child processes', len(self.repos), len(self.delete_pids))
for tag_id, task_id in self.tasks.iteritems():
@ -298,10 +309,8 @@ class RepoManager(object):
self.logger.info('Dropping entry for inactive repo: %s', repo_id)
del self.repos[repo_id]
def checkCurrentRepos(self, session=None):
def checkCurrentRepos(self):
"""Determine which repos are current"""
if session is None:
session = self.session
to_check = []
repo_ids = self.repos.keys()
for repo_id in repo_ids:
@ -324,10 +333,8 @@ class RepoManager(object):
self.logger.debug("Skipped check for repos: %r", skipped)
if not to_check:
return
#session.multicall = True
for repo in to_check:
changed = session.tagChangedSinceEvent(repo.event_id, repo.taglist)
#for repo, [changed] in zip(to_check, session.multiCall(strict=True)):
changed = self.session.tagChangedSinceEvent(repo.event_id, repo.taglist)
if changed:
self.logger.info("Repo %i no longer current", repo.repo_id)
repo.current = False
@ -335,15 +342,31 @@ class RepoManager(object):
def currencyChecker(self, session):
"""Continually checks repos for currency. Runs as a separate thread"""
self.session = session
self.logger = logging.getLogger("koji.repo.currency")
self.logger.info('currencyChecker starting')
try:
try:
while True:
self.checkCurrentRepos(session)
time.sleep(self.options.sleeptime)
except:
logger.exception('Error in currency checker thread')
raise
while True:
self.checkCurrentRepos()
time.sleep(self.options.sleeptime)
except:
self.logger.exception('Error in currency checker thread')
raise
finally:
session.logout()
def regenLoop(self, session):
"""Triggers regens as needed/possible. Runs in a separate thread"""
self.session = session
self.logger = logging.getLogger("koji.repo.regen")
self.logger.info('regenLoop starting')
try:
while True:
self.regenRepos()
time.sleep(self.options.sleeptime)
except:
self.logger.exception('Error in regen thread')
raise
finally:
session.logout()
@ -363,10 +386,13 @@ class RepoManager(object):
self.logger.debug("%s is not a directory, skipping", tagdir)
continue
for repo_id in os.listdir(tagdir):
if repo_id == 'latest':
# ignore latest symlinks
continue
try:
repo_id = int(repo_id)
except ValueError:
self.logger.debug("%s not an int, skipping", tagdir)
self.logger.debug("%s/%s not an int, skipping", tagdir, repo_id)
continue
repodir = "%s/%s" % (tagdir, repo_id)
if not os.path.isdir(repodir):
@ -374,7 +400,6 @@ class RepoManager(object):
continue
if repo_id in self.repos:
#we're already managing it, no need to deal with it here
self.logger.debug("seen %s already, skipping", repodir)
continue
try:
dir_ts = os.stat(repodir).st_mtime
@ -423,148 +448,152 @@ class RepoManager(object):
self.logger.debug("tag %s recent use count: %i" % (tag_name, len(recent)))
return stats
def adjustRegenOrder(self, data):
"""Adjust repo regen order
def setTagScore(self, entry):
"""Set score for needed_tag entry
data is list of (ts, tag_id) entries
We sort the tags by two factors
- age of current repo (passed in via data)
- last use in a buildroot (via tagUseStats)
Having and older repo or a higher use count give the repo
a higher priority for regen. The formula attempts to keep
the last use factor from overpowering, so that very old repos
still get regen priority.
We score the tags by two factors
- age of current repo
- last use in a buildroot
Having an older repo or a higher use count gives the tag a higher
priority for regen. The formula attempts to keep the last use factor
from overpowering, so that tags with very old repos still get priority
"""
if not data:
return []
n_maven = 0
for ts, tag_id in data:
taginfo = getTag(self.session, tag_id)
if taginfo.get('maven_support'):
n_maven += 1
self.logger.info("Got %i tags for regeneration (%i maven tags)", len(data), n_maven)
if len(data) == 1:
return data[:]
data = [(ts, tag_id, self.tagUseStats(tag_id)) for ts, tag_id in data]
max_n = max([s['n_recent'] for ts,tag,s in data])
stats = self.tagUseStats(entry['taginfo']['id'])
# normalize use count
max_n = max([t.get('n_recent', 0) for t in self.needed_tags.values()]
or [1])
if max_n == 0:
self.logger.info("No tags had recent use")
ret = [(ts,tag) for ts,tag,s in data]
ret.sort()
return ret
#XXX - need to make sure our times aren't far off, otherwise this
# adjustment could have the opposite of the desired effect
now = time.time()
ret = []
names = {}
for ts, tag_id, stats in data:
names[tag_id] = stats['tag_name']
#normalize use count
adj = stats ['n_recent'] * 9.0 / max_n + 1 # 1.0 to 10.0
sortvalue = (now-ts)*adj
ret.append(((now-ts)*adj, tag_id))
self.logger.debug("order adjustment: tag %s, ts %s, recent use %s, factor %s, new sort value %s",
stats['tag_name'], ts, stats ['n_recent'], adj, sortvalue)
#so a day old unused repo gets about the regen same priority as a
#2.4-hour-old, very popular repo
ret.sort()
ret.reverse()
if self.logger.isEnabledFor(logging.INFO):
#show some stats
by_ts = [(ts,names[tag]) for ts,tag,s in data]
by_ts.sort()
self.logger.info("Newest repo: %s (%.2fhrs)", by_ts[-1][1], (now - by_ts[-1][0])/3600.)
self.logger.info("Oldest repo: %s (%.2fhrs)", by_ts[0][1], (now - by_ts[0][0])/3600.)
self.logger.info("Best score: %s (%.1f)", names[ret[0][1]], ret[0][0])
self.logger.info("Worst score: %s (%.1f)", names[ret[-1][1]], ret[-1][0])
self.logger.info("Order: %s", [names[x[1]] for x in ret])
return ret
# no recent use or missing data
max_n = 1
adj = stats['n_recent'] * 9.0 / max_n + 1 # 1.0 to 10.0
ts = entry['expire_ts']
age = time.time() - ts
# XXX - need to make sure our times aren't far off, otherwise this
# scoring could have the opposite of the desired effect
if age < 0:
self.logger.warning("Needed tag has future expire_ts: %r", entry)
age = 0
entry['score'] = age * adj
self.logger.debug("Needed tag %s got score %.2f",
entry['taginfo']['name'], entry['score'])
# so a day old unused repo gets about the regen same score as a
# 2.4-hour-old, very popular repo
def updateTagScores(self):
for entry in self.needed_tags.values():
self.setTagScore(entry)
def updateRepos(self):
#check on tasks
running_tasks = 0
running_tasks_maven = 0
our_tasks = {}
for tag_id, task_id in self.tasks.items():
tinfo = self.session.getTaskInfo(task_id)
our_tasks[task_id] = tinfo
tstate = koji.TASK_STATES[tinfo['state']]
if tstate == 'CLOSED':
self.logger.info("Finished: newRepo task %s for tag %s" % (task_id, tag_id))
del self.tasks[tag_id]
continue
elif tstate in ('CANCELED', 'FAILED'):
self.logger.info("Problem: newRepo task %s for tag %s is %s" % (task_id, tag_id, tstate))
del self.tasks[tag_id]
continue
taginfo = getTag(self.session, tag_id)
if tinfo['waiting']:
self.logger.debug("Task %i is waiting", task_id)
else:
#the largest hub impact is from the first part of the newRepo task
#once it is waiting on subtasks, that part is over
running_tasks += 1
if taginfo.get('maven_support'):
running_tasks_maven += 1
#TODO [?] - implement a timeout for active tasks?
#check for untracked newRepo tasks
repo_tasks = self.session.listTasks(opts={'method':'newRepo',
'state':([koji.TASK_STATES[s] for s in ('FREE', 'OPEN')])})
other_tasks = []
for tinfo in repo_tasks:
if tinfo['id'] in our_tasks:
continue
other_tasks.append(tinfo)
if tinfo['waiting']:
self.logger.debug("Untracked task %i is waiting", tinfo['id'])
else:
running_tasks += 1
# TODO - determine tag and maven support
self.checkTasks()
self.logger.debug("Current tasks: %r" % self.tasks)
if other_tasks:
self.logger.debug("Found %i untracked newRepo tasks" % len(other_tasks))
if self.other_tasks:
self.logger.debug("Found %i untracked newRepo tasks",
len(self.other_tasks))
self.logger.debug("Updating repos")
self.readCurrentRepos()
#check for stale repos
# check for stale repos
for repo in self.repos.values():
if repo.stale():
repo.expire()
#find out which tags require repos
tags = {}
for target in self.session.getBuildTargets():
tag_id = target['build_tag']
tags[tag_id] = target['build_tag_name']
# find out which tags require repos
self.checkNeeded()
self.updateTagScores()
# trigger deletes
n_deletes = 0
for repo in self.repos.values():
if n_deletes >= self.options.delete_batch_size:
break
if repo.expired():
#try to delete
if repo.tryDelete():
n_deletes += 1
del self.repos[repo.repo_id]
def checkTasks(self):
"""Check on newRepo tasks
- update taskinfo
- remove finished tasks
- check for other newRepo tasks (not generated by us)
"""
task_ids = list(self.tasks)
self.session.multicall = True
for task_id in task_ids:
self.session.getTaskInfo(task_id)
for task_id, [tinfo] in zip(task_ids, self.session.multiCall(strict=True)):
tstate = koji.TASK_STATES[tinfo['state']]
tag_id = self.tasks[task_id]['tag_id']
if tstate == 'CLOSED':
self.logger.info("Finished: newRepo task %s for tag %s" % (task_id, tag_id))
del self.tasks[task_id]
elif tstate in ('CANCELED', 'FAILED'):
self.logger.info("Problem: newRepo task %s for tag %s is %s" % (task_id, tag_id, tstate))
del self.tasks[task_id]
else:
self.tasks[task_id]['taskinfo'] = tinfo
# TODO: implement a timeout
# also check other newRepo tasks
repo_tasks = self.session.listTasks(opts={'method':'newRepo',
'state':([koji.TASK_STATES[s] for s in ('FREE', 'OPEN')])})
others = [t for t in repo_tasks if t['id'] not in self.tasks]
for tinfo in others:
if tinfo['id'] not in self.other_tasks:
self.logger.info("Untracked newRepo task: %(id)i", tinfo)
# note: possible race here, but only a log message
# TODO - determine tag and maven support
self.other_tasks = dict([(t['id'], t) for t in others])
def checkNeeded(self):
"""Determine which tags currently need regeneration"""
n_need = len(self.needed_tags)
self.build_tags = set(
[t['build_tag']for t in self.session.getBuildTargets()])
#index repos by tag
tag_repos = {}
for repo in self.repos.values():
tag_repos.setdefault(repo.tag_id, []).append(repo)
self.logger.debug("Needed tags: %r" % tags.keys())
self.logger.debug("Current tags: %r" % tag_repos.keys())
#we need to determine:
# - which tags need a new repo
# - if any repos seem to be broken
#self.checkCurrentRepos now runs continually in a separate thread
regen = []
expire_times = {}
for tag_id in tags.iterkeys():
for tag_id in self.build_tags:
covered = False
for repo in tag_repos.get(tag_id,[]):
for repo in tag_repos.get(tag_id, []):
if repo.current:
covered = True
break
elif repo.pending():
#one on the way
# one on the way
covered = True
break
if tag_id in self.needed_tags:
entry = self.needed_tags[tag_id]
if covered:
# no longer needed
self.logger.info("Tag %(name)s has a current or in "
"progress repo", entry['taginfo'])
del self.needed_tags[tag_id]
# if not covered, we already know
continue
if covered:
continue
if tag_id in self.tasks:
#repo creation in progress
#TODO - implement a timeout
# we haven't noted this need yet
taginfo = self.session.getTag(tag_id)
# (not using the caching version since we only call upon discovery)
if not taginfo:
self.logger.warning('Tag disappeared: %i', tag_id)
continue
#tag still appears to be uncovered
#figure out how old existing repo is
self.logger.info('Tag needs regen: %(name)s', taginfo)
# how expired are we?
ts = 0
for repo in tag_repos.get(tag_id, []):
if repo.expire_ts:
@ -572,64 +601,104 @@ class RepoManager(object):
ts = repo.expire_ts
else:
self.logger.warning("No expire timestamp for repo: %s", repo.repo_id)
expire_times[tag_id] = ts
if ts == 0:
ts = time.time()
regen.append((ts, tag_id))
#factor in tag use stats
regen = self.adjustRegenOrder(regen)
self.logger.debug("order: %s", regen)
# i.e. tags with oldest (or no) repos get precedence
for score, tag_id in regen:
entry = {
'taginfo': taginfo,
'expire_ts': ts,
'needed_since' : time.time(),
}
self.setTagScore(entry)
self.needed_tags[tag_id] = entry
# some cleanup
for tag_id in list(self.needed_tags):
entry = self.needed_tags.get(tag_id)
if tag_id not in self.build_tags:
self.logger.info("Tag %(name)s is no longer a build tag",
entry['taginfo'])
del self.needed_tags[tag_id]
for tag_id, repolist in tag_repos.items():
if tag_id not in self.build_tags:
# repos for these tags are no longer required
for repo in repolist:
if repo.ready():
repo.expire()
if n_need != len(self.needed_tags):
self.logger.info('Needed tags count went from %i to %i', n_need,
len(self.needed_tags))
def regenRepos(self):
"""Trigger newRepo tasks for needed tags"""
# first note currently running tasks
running_tasks = 0
running_tasks_maven = 0
for task in self.tasks.values():
if task['taskinfo']['waiting']:
self.logger.debug("Task %(id)i is waiting", task)
else:
# The largest hub impact is from the first part of the newRepo
# task. Once it is waiting on subtasks, that part is over
running_tasks += 1
if task['maven']:
running_tasks_maven += 1
debuginfo_pat = self.options.debuginfo_tags.split()
src_pat = self.options.source_tags.split()
order = self.needed_tags.values()
order.sort(key=lambda t:t['score'])
for tag in order:
if running_tasks >= self.options.max_repo_tasks:
self.logger.info("Maximum number of repo tasks reached")
break
elif len(self.tasks) + len(other_tasks) >= self.options.repo_tasks_limit:
return
elif (len(self.tasks) + len(self.other_tasks)
>= self.options.repo_tasks_limit):
self.logger.info("Repo task limit reached")
break
tagname = tags[tag_id]
return
tagname = tag['taginfo']['name']
task_id = tag.get('task_id')
if task_id:
if task_id in self.tasks:
# we already have a task
continue
else:
# should not happen
logger.warning('Needed tag refers to unknown task. '
'%s -> %i', tagname, task_id)
taskopts = {}
for pat in self.options.debuginfo_tags.split():
if fnmatch.fnmatch(tagname, pat):
taskopts['debuginfo'] = True
break
for pat in self.options.source_tags.split():
if fnmatch.fnmatch(tagname, pat):
taskopts['src'] = True
break
taginfo = getTag(self.session, tag_id)
if taginfo.get('maven_support'):
if koji.util.multi_fnmatch(tagname, debuginfo_pat):
taskopts['debuginfo'] = True
if koji.util.multi_fnmatch(tagname, src_pat):
taskopts['src'] = True
maven = tag['taginfo']['maven_support']
if maven:
if running_tasks_maven >= self.options.max_repo_tasks_maven:
continue
task_id = self.session.newRepo(tagname, **taskopts)
running_tasks += 1
if taginfo.get('maven_support'):
if maven:
running_tasks_maven += 1
expire_ts = expire_times[tag_id]
if expire_ts == 0:
expire_ts = tag['expire_ts']
if expire_ts == 0: # can this still happen?
time_expired = '???'
else:
time_expired = "%.1f" % (time.time() - expire_ts)
self.logger.info("Created newRepo task %s for tag %s (%s), expired for %s sec" % (task_id, tag_id, tags[tag_id], time_expired))
self.tasks[tag_id] = task_id
self.logger.info("Created newRepo task %s for tag %s (%s), "
"expired for %s sec", task_id, tag['taginfo']['id'],
tag['taginfo']['name'], time_expired)
self.tasks[task_id] = {
'taskinfo': self.session.getTaskInfo(task_id),
'tag_id': tag['taginfo']['id'],
'maven': maven,
}
tag['task_id'] = task_id
if running_tasks_maven >= self.options.max_repo_tasks_maven:
self.logger.info("Maximum number of maven repo tasks reached")
#some cleanup
n_deletes = 0
for tag_id, repolist in tag_repos.items():
if tag_id not in tags:
#repos for these tags are no longer required
for repo in repolist:
if repo.ready():
repo.expire()
for repo in repolist:
if n_deletes >= self.options.delete_batch_size:
break
if repo.expired():
#try to delete
if repo.tryDelete():
n_deletes += 1
del self.repos[repo.repo_id]
def start_currency_checker(session, repomgr):
subsession = session.subsession()
@ -639,6 +708,16 @@ def start_currency_checker(session, repomgr):
thread.start()
return thread
def start_regen_loop(session, repomgr):
subsession = session.subsession()
thread = threading.Thread(name='regenLoop',
target=repomgr.regenLoop, args=(subsession,))
thread.setDaemon(True)
thread.start()
return thread
def main(options, session):
repomgr = RepoManager(options, session)
repomgr.readCurrentRepos()
@ -646,6 +725,7 @@ def main(options, session):
raise SystemExit
signal.signal(signal.SIGTERM,shutdown)
curr_chk_thread = start_currency_checker(session, repomgr)
regen_thread = start_regen_loop(session, repomgr)
# TODO also move rmtree jobs to threads
logger.info("Entering main loop")
repodir = "%s/repos" % pathinfo.topdir
@ -660,6 +740,9 @@ def main(options, session):
if not curr_chk_thread.isAlive():
logger.error("Currency checker thread died. Restarting it.")
curr_chk_thread = start_currency_checker(session, repomgr)
if not regen_thread.isAlive():
logger.error("Currency checker thread died. Restarting it.")
regen_thread = start_regen_loop(session, repomgr)
except KeyboardInterrupt:
logger.warn("User exit")
break
@ -762,7 +845,7 @@ def get_options():
int_opts = ('deleted_repo_lifetime', 'max_repo_tasks', 'repo_tasks_limit',
'retry_interval', 'max_retries', 'offline_retry_interval',
'max_delete_processes', 'max_repo_tasks_maven',
'delete_batch_size', 'dist_repo_lifetime')
'delete_batch_size', 'dist_repo_lifetime', 'sleeptime')
str_opts = ('topdir', 'server', 'user', 'password', 'logfile', 'principal', 'keytab', 'krbservice',
'cert', 'ca', 'serverca', 'debuginfo_tags', 'source_tags') # FIXME: remove ca here
bool_opts = ('with_src','verbose','debug','ignore_stray_repos', 'offline_retry',