188 lines
5.2 KiB
Python
Executable file
188 lines
5.2 KiB
Python
Executable file
#!/usr/bin/python3
|
|
|
|
from __future__ import absolute_import, print_function
|
|
|
|
import ast
|
|
import io
|
|
import optparse
|
|
import os
|
|
import os.path
|
|
import pprint
|
|
import sys
|
|
|
|
from urllib.parse import quote
|
|
|
|
sys.path.insert(0, os.getcwd())
|
|
import koji
|
|
from kojihub import auth, kojixmlrpc, kojihub, db
|
|
from koji.context import context
|
|
import koji.xmlrpcplus
|
|
|
|
|
|
# alternate handle_rpc, used when --pdb is given
|
|
original_handle_rpc = kojixmlrpc.ModXMLRPCRequestHandler.handle_rpc
|
|
|
|
def handle_rpc(self, environ):
|
|
try:
|
|
return original_handle_rpc(self, environ)
|
|
except Exception:
|
|
import pdb
|
|
import traceback
|
|
etype, e, tb = sys.exc_info()
|
|
traceback.print_exc()
|
|
pdb.post_mortem(tb)
|
|
# re-raise or the app handler will commit
|
|
raise
|
|
|
|
|
|
# Fake session for authenticated calls
|
|
class FakeSession(auth.Session):
|
|
|
|
def __init__(self, user, exclusive=False):
|
|
user = kojihub.get_user(user, strict=True)
|
|
self.logged_in = True
|
|
self.id = 1
|
|
self.user_id = user['id']
|
|
self.authtype = koji.AUTHTYPES['GSSAPI']
|
|
self.hostip = '127.0.0.1'
|
|
self.master = None
|
|
self.callnum = 1
|
|
self.message = 'THIS IS A FAKE SESSION'
|
|
self.exclusive = exclusive
|
|
self.user_data = user
|
|
self.session_data = {'msg': 'this is a fake session'}
|
|
self._perms = None
|
|
self._groups = None
|
|
self._host_id = ''
|
|
|
|
|
|
original_check_session = kojixmlrpc.ModXMLRPCRequestHandler.check_session
|
|
|
|
|
|
# alternate check_session, used to emulate auth
|
|
def check_session(self):
|
|
if 'KOJI_FAKEHUB_USER' in context.environ:
|
|
context.session = FakeSession(context.environ['KOJI_FAKEHUB_USER'],
|
|
context.environ.get('KOJI_FAKEHUB_EXCLUSIVE', False))
|
|
else:
|
|
original_check_session(self)
|
|
|
|
|
|
def get_url(environ):
|
|
url = environ['wsgi.url_scheme']+'://'
|
|
|
|
if environ.get('HTTP_HOST'):
|
|
url += environ['HTTP_HOST']
|
|
else:
|
|
url += environ['SERVER_NAME']
|
|
|
|
if environ['wsgi.url_scheme'] == 'https':
|
|
if environ['SERVER_PORT'] != '443':
|
|
url += ':' + environ['SERVER_PORT']
|
|
else:
|
|
if environ['SERVER_PORT'] != '80':
|
|
url += ':' + environ['SERVER_PORT']
|
|
|
|
url += quote(environ.get('SCRIPT_NAME', ''))
|
|
url += quote(environ.get('PATH_INFO', ''))
|
|
if environ.get('QUERY_STRING'):
|
|
url += '?' + environ['QUERY_STRING']
|
|
return url
|
|
|
|
|
|
def nice_literal(value):
|
|
try:
|
|
return ast.literal_eval(value)
|
|
except (ValueError, SyntaxError):
|
|
return value
|
|
|
|
|
|
def get_options():
|
|
parser = optparse.OptionParser(usage='%prog [options] <policy_file>')
|
|
parser.add_option('--pdb', action='store_true',
|
|
help='drop into pdb on error')
|
|
parser.add_option('--user', '-u', help='execute as user')
|
|
parser.add_option('--exclusive', '-x', action='store_true',
|
|
help='emulate an exclusive session')
|
|
parser.add_option('-n', '--no-commit', action='store_true',
|
|
help='skip commit')
|
|
opts, args = parser.parse_args()
|
|
|
|
# parse request from args
|
|
method = args[0]
|
|
callargs = []
|
|
kwargs = {}
|
|
for s in args[1:]:
|
|
if '=' in s:
|
|
k, v = s.split('=', 1)
|
|
if k.isidentifier():
|
|
v = nice_literal(v)
|
|
kwargs[k] = v
|
|
continue
|
|
# else
|
|
callargs.append(nice_literal(s))
|
|
callargs = koji.encode_args(*callargs, **kwargs)
|
|
opts.request = koji.xmlrpcplus.dumps(callargs, method, allow_none=1)
|
|
|
|
return opts
|
|
|
|
|
|
def start_response(status, headers):
|
|
pprint.pprint("Status: %r" % status)
|
|
pprint.pprint("Headers: %r" % headers)
|
|
|
|
|
|
def parse_response(data):
|
|
p, u = koji.xmlrpcplus.getparser()
|
|
for chunk in data:
|
|
p.feed(chunk)
|
|
p.close()
|
|
result = u.close()
|
|
if len(result) == 1:
|
|
result = result[0]
|
|
return result
|
|
|
|
|
|
def set_config(environ):
|
|
lconfig = "%s/devtools/fakehub.conf" % os.getcwd()
|
|
lconfigd = "%s/devtools/fakehub.conf.d" % os.getcwd()
|
|
if os.path.exists(lconfig) or os.path.exists(lconfigd):
|
|
environ['koji.hub.ConfigFile'] = lconfig
|
|
environ['koji.hub.ConfigDir'] = lconfigd
|
|
|
|
|
|
def skip_commit(cnx):
|
|
print('Skipping commit')
|
|
|
|
|
|
def main():
|
|
options = get_options()
|
|
if options.pdb:
|
|
kojixmlrpc.ModXMLRPCRequestHandler.handle_rpc = handle_rpc
|
|
if options.no_commit:
|
|
db.DBWrapper.commit = skip_commit
|
|
|
|
environ = {}
|
|
environ['SCRIPT_FILENAME'] = kojixmlrpc.__file__
|
|
environ['wsgi.url_scheme'] = 'https'
|
|
environ['SERVER_NAME'] = 'myserver'
|
|
environ['SERVER_PORT'] = '443'
|
|
environ['REQUEST_URI'] = get_url(environ)
|
|
environ['wsgi.input'] = io.StringIO(options.request)
|
|
environ['REQUEST_METHOD'] = 'POST'
|
|
environ['CONTENT_TYPE'] = 'text/xml'
|
|
if options.user:
|
|
environ['KOJI_FAKEHUB_USER'] = options.user
|
|
kojixmlrpc.ModXMLRPCRequestHandler.check_session = check_session
|
|
if options.exclusive:
|
|
environ['KOJI_FAKEHUB_EXCLUSIVE'] = True
|
|
|
|
set_config(environ)
|
|
print('RESULT:')
|
|
data = kojixmlrpc.application(environ, start_response)
|
|
result = parse_response(data)
|
|
pprint.pprint(result)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|