Allow password in SCM url with new builder option

Fixes: https://pagure.io/koji/issue/3179
This commit is contained in:
Jana Cupova 2022-01-10 13:48:16 +01:00 committed by Tomas Kopecek
parent bf23bf904e
commit 725c157fbd
3 changed files with 43 additions and 28 deletions

View file

@ -1644,7 +1644,7 @@ class BuildMavenTask(BaseBuildTask):
opts = {} opts = {}
self.opts = opts self.opts = opts
scm = SCM(url) scm = SCM(url, allow_password=self.options.allow_password_in_scm_url)
scm_policy_opts = { scm_policy_opts = {
'user_id': self.taskinfo['owner'], 'user_id': self.taskinfo['owner'],
'channel': self.session.getChannel(self.taskinfo['channel_id'], 'channel': self.session.getChannel(self.taskinfo['channel_id'],
@ -1728,7 +1728,8 @@ class BuildMavenTask(BaseBuildTask):
# Checkout out patches, if present # Checkout out patches, if present
if self.opts.get('patches'): if self.opts.get('patches'):
patchlog = self.workdir + '/patches.log' patchlog = self.workdir + '/patches.log'
patch_scm = SCM(self.opts.get('patches')) patch_scm = SCM(self.opts.get('patches'),
allow_password=self.options.allow_password_in_scm_url)
patch_scm.assert_allowed(allowed=self.options.allowed_scms, patch_scm.assert_allowed(allowed=self.options.allowed_scms,
session=self.session, session=self.session,
by_config=self.options.allowed_scms_use_config, by_config=self.options.allowed_scms_use_config,
@ -2015,7 +2016,7 @@ class WrapperRPMTask(BaseBuildTask):
# can't happen # can't happen
assert False # pragma: no cover assert False # pragma: no cover
scm = SCM(spec_url) scm = SCM(spec_url, allow_password=self.options.allow_password_in_scm_url)
scm.assert_allowed(allowed=self.options.allowed_scms, scm.assert_allowed(allowed=self.options.allowed_scms,
session=self.session, session=self.session,
by_config=self.options.allowed_scms_use_config, by_config=self.options.allowed_scms_use_config,
@ -3016,7 +3017,7 @@ class ImageTask(BaseTaskHandler):
koji.ensuredir(scmdir) koji.ensuredir(scmdir)
self.logger.debug("ksfile = %s" % ksfile) self.logger.debug("ksfile = %s" % ksfile)
if self.opts.get('ksurl'): if self.opts.get('ksurl'):
scm = SCM(self.opts['ksurl']) scm = SCM(self.opts['ksurl'], allow_password=self.options.allow_password_in_scm_url)
scm.assert_allowed(allowed=self.options.allowed_scms, scm.assert_allowed(allowed=self.options.allowed_scms,
session=self.session, session=self.session,
by_config=self.options.allowed_scms_use_config, by_config=self.options.allowed_scms_use_config,
@ -3499,7 +3500,7 @@ class LiveMediaTask(ImageTask):
An absolute path (from within the chroot) to where livemedia-creator An absolute path (from within the chroot) to where livemedia-creator
can find the checked out templates. can find the checked out templates.
""" """
scm = SCM(self.opts['lorax_url']) scm = SCM(self.opts['lorax_url'], allow_password=self.options.allow_password_in_scm_url)
scm.assert_allowed(allowed=self.options.allowed_scms, scm.assert_allowed(allowed=self.options.allowed_scms,
session=self.session, session=self.session,
by_config=self.options.allowed_scms_use_config, by_config=self.options.allowed_scms_use_config,
@ -3755,7 +3756,7 @@ class OzImageTask(BaseTaskHandler):
ksfile = self.opts.get('kickstart') ksfile = self.opts.get('kickstart')
self.logger.debug("ksfile = %s" % ksfile) self.logger.debug("ksfile = %s" % ksfile)
if self.opts.get('ksurl'): if self.opts.get('ksurl'):
scm = SCM(self.opts['ksurl']) scm = SCM(self.opts['ksurl'], allow_password=self.options.allow_password_in_scm_url)
scm.assert_allowed(allowed=self.options.allowed_scms, scm.assert_allowed(allowed=self.options.allowed_scms,
session=self.session, session=self.session,
by_config=self.options.allowed_scms_use_config, by_config=self.options.allowed_scms_use_config,
@ -4591,7 +4592,7 @@ class BuildIndirectionImageTask(OzImageTask):
# to be entirely self contained. Revisit if anyone feels like a refactor. # to be entirely self contained. Revisit if anyone feels like a refactor.
self.logger.debug("filepath = %s" % filepath) self.logger.debug("filepath = %s" % filepath)
if fileurl: if fileurl:
scm = SCM(fileurl) scm = SCM(fileurl, allow_password=self.options.allow_password_in_scm_url)
scm.assert_allowed(allowed=self.options.allowed_scms, scm.assert_allowed(allowed=self.options.allowed_scms,
session=self.session, session=self.session,
by_config=self.options.allowed_scms_use_config, by_config=self.options.allowed_scms_use_config,
@ -5012,7 +5013,7 @@ class BuildSRPMFromSCMTask(BaseBuildTask):
if opts is None: if opts is None:
opts = {} opts = {}
# will throw a BuildError if the url is invalid # will throw a BuildError if the url is invalid
scm = SCM(url) scm = SCM(url, allow_password=self.options.allow_password_in_scm_url)
scm.assert_allowed(allowed=self.options.allowed_scms, scm.assert_allowed(allowed=self.options.allowed_scms,
session=self.session, session=self.session,
by_config=self.options.allowed_scms_use_config, by_config=self.options.allowed_scms_use_config,
@ -6486,7 +6487,8 @@ def get_options():
'task_avail_delay': 300, 'task_avail_delay': 300,
'cert': None, 'cert': None,
'serverca': None, 'serverca': None,
'allow_noverifyssl': False} 'allow_noverifyssl': False,
'allow_password_in_scm_url': False}
if config.has_section('kojid'): if config.has_section('kojid'):
for name, value in config.items('kojid'): for name, value in config.items('kojid'):
if name in ['sleeptime', 'maxjobs', 'minspace', 'retry_interval', if name in ['sleeptime', 'maxjobs', 'minspace', 'retry_interval',
@ -6502,7 +6504,7 @@ def get_options():
'createrepo_update', 'use_fast_upload', 'support_rpm_source_layout', 'createrepo_update', 'use_fast_upload', 'support_rpm_source_layout',
'build_arch_can_fail', 'no_ssl_verify', 'log_timestamps', 'build_arch_can_fail', 'no_ssl_verify', 'log_timestamps',
'allow_noverifyssl', 'allowed_scms_use_config', 'allow_noverifyssl', 'allowed_scms_use_config',
'allowed_scms_use_policy']: 'allowed_scms_use_policy', 'allow_password_in_scm_url']:
defaults[name] = config.getboolean('kojid', name) defaults[name] = config.getboolean('kojid', name)
elif name in ['plugin', 'plugins']: elif name in ['plugin', 'plugins']:
defaults['plugin'] = value.split() defaults['plugin'] = value.split()

View file

@ -149,3 +149,6 @@ from_addr=Koji Build System <buildsys@example.com>
;allow passing noverifyssl option to anaconda for image builds ;allow passing noverifyssl option to anaconda for image builds
;allow_noverifyssl = False ;allow_noverifyssl = False
;allow password in SCM url
;allow_password_in_scm_url = False

View file

@ -209,7 +209,7 @@ class SCM(object):
else: else:
return False return False
def __init__(self, url): def __init__(self, url, allow_password=False):
""" """
Initialize the SCM object using the specified url. Initialize the SCM object using the specified url.
The expected url format is: The expected url format is:
@ -237,10 +237,14 @@ class SCM(object):
raise koji.GenericError('Invalid SCM URL: %s' % url) raise koji.GenericError('Invalid SCM URL: %s' % url)
self.url = url self.url = url
scheme, user, host, path, query, fragment = self._parse_url() scheme, user, password, host, path, query, fragment = self._parse_url(
allow_password=allow_password)
self.scheme = scheme self.scheme = scheme
self.user = user if password is not None:
self.user = '%s:%s' % (user, password)
else:
self.user = user
self.host = host self.host = host
self.repository = path self.repository = path
self.module = query self.module = query
@ -261,7 +265,7 @@ class SCM(object):
keys = ["url", "scheme", "user", "host", "repository", "module", "revision", "scmtype"] keys = ["url", "scheme", "user", "host", "repository", "module", "revision", "scmtype"]
return dslice(vars(self), keys) return dslice(vars(self), keys)
def _parse_url(self): def _parse_url(self, allow_password=False):
""" """
Parse the SCM url into usable components. Parse the SCM url into usable components.
Return the following tuple: Return the following tuple:
@ -275,23 +279,29 @@ class SCM(object):
# replace the scheme with http:// so that the urlparse works in all cases # replace the scheme with http:// so that the urlparse works in all cases
dummyurl = self.url.replace(scheme, 'http://', 1) dummyurl = self.url.replace(scheme, 'http://', 1)
dummyscheme, netloc, path, params, query, fragment = urllib.parse.urlparse(dummyurl) parsed_url = urllib.parse.urlparse(dummyurl)
path = parsed_url.path
params = parsed_url.params
query = parsed_url.query
fragment = parsed_url.fragment
user = None user = None
userhost = netloc.split('@') password = None
if len(userhost) == 2:
user = userhost[0] userhost = parsed_url.netloc.split('@')
if not user: if len(userhost) > 2:
# Don't return an empty string raise koji.GenericError('Invalid username@hostname specified: %s' % userhost)
user = None if not userhost:
elif ':' in user:
raise koji.GenericError('username:password format not supported: %s' % user)
netloc = userhost[1]
elif len(userhost) > 2:
raise koji.GenericError('Invalid username@hostname specified: %s' % netloc)
if not netloc:
raise koji.GenericError( raise koji.GenericError(
'Unable to parse SCM URL: %s . Could not find the netloc element.' % self.url) 'Unable to parse SCM URL: %s . Could not find the netloc element.' % self.url)
if parsed_url.username:
user = parsed_url.username
if parsed_url.password:
password = parsed_url.password
if password is not None and not allow_password:
raise koji.GenericError('username:password format not supported: %s:%s'
% (user, password))
netloc = parsed_url.hostname
# check for empty path before we apply normpath # check for empty path before we apply normpath
if not path: if not path:
@ -326,7 +336,7 @@ class SCM(object):
'Unable to parse SCM URL: %s . Could not find the fragment element.' % self.url) 'Unable to parse SCM URL: %s . Could not find the fragment element.' % self.url)
# return parsed values # return parsed values
return (scheme, user, netloc, path, query, fragment) return (scheme, user, password, netloc, path, query, fragment)
def assert_allowed(self, allowed='', session=None, by_config=True, by_policy=False, def assert_allowed(self, allowed='', session=None, by_config=True, by_policy=False,
policy_data=None): policy_data=None):