199 lines
5.5 KiB
Python
Executable file
199 lines
5.5 KiB
Python
Executable file
#!/usr/bin/python3
|
|
|
|
from __future__ import absolute_import, print_function
|
|
|
|
import ast
|
|
import mimetypes
|
|
import os
|
|
import os.path
|
|
import optparse
|
|
import pprint
|
|
import sys
|
|
from urllib.parse import quote
|
|
from wsgiref.simple_server import make_server
|
|
from wsgiref.util import setup_testing_defaults
|
|
|
|
|
|
CWD = os.getcwd()
|
|
sys.path.insert(0, CWD)
|
|
sys.path.insert(1, os.path.join(CWD, 'www/lib'))
|
|
sys.path.insert(1, os.path.join(CWD, 'www/kojiweb'))
|
|
import wsgi_publisher
|
|
import index as kojiweb_handlers
|
|
|
|
|
|
def get_url(environ):
|
|
url = environ['wsgi.url_scheme']+'://'
|
|
|
|
if environ.get('HTTP_HOST'):
|
|
url += environ['HTTP_HOST']
|
|
else:
|
|
url += environ['SERVER_NAME']
|
|
|
|
if environ['wsgi.url_scheme'] == 'https':
|
|
if environ['SERVER_PORT'] != '443':
|
|
url += ':' + environ['SERVER_PORT']
|
|
else:
|
|
if environ['SERVER_PORT'] != '80':
|
|
url += ':' + environ['SERVER_PORT']
|
|
|
|
url += quote(environ.get('SCRIPT_NAME', ''))
|
|
url += quote(environ.get('PATH_INFO', ''))
|
|
if environ.get('QUERY_STRING'):
|
|
url += '?' + environ['QUERY_STRING']
|
|
return url
|
|
|
|
FIRST = True
|
|
|
|
|
|
def do_static(environ, start_response):
|
|
redirect = os.environ.get('STATIC_URL', '')
|
|
if redirect:
|
|
environ['STATIC_URL'] = redirect
|
|
return redirect_static(environ, start_response)
|
|
# otherwise serve our local static files
|
|
path = environ.get('PATH_INFO', '')
|
|
assert path.startswith('/koji-static')
|
|
path = path[12:]
|
|
path = path.lstrip('/')
|
|
fn = os.path.join(CWD, 'www/static', path)
|
|
if not os.path.exists(fn):
|
|
print("No such file: %s" % fn)
|
|
return do_404(environ, start_response)
|
|
size = os.path.getsize(fn)
|
|
ctype, encoding = mimetypes.guess_type(fn)
|
|
headers = [
|
|
('Content-Length', str(size)),
|
|
('Content-Type', ctype),
|
|
]
|
|
start_response('200 OK', headers)
|
|
return iter_file(fn)
|
|
|
|
|
|
def do_404(environ, start_response):
|
|
content = 'URL not found\n'
|
|
headers = [
|
|
('Content-Length', str(len(content))),
|
|
('Content-Type', 'text/plain'),
|
|
]
|
|
start_response('404 Not Found', headers)
|
|
return [content]
|
|
|
|
|
|
def iter_file(fn):
|
|
with open(fn, 'rb') as fo:
|
|
while True:
|
|
chunk = fo.read(8192)
|
|
if not chunk:
|
|
break
|
|
yield chunk
|
|
|
|
|
|
def redirect_static(environ, start_response):
|
|
response = ''
|
|
headers = [
|
|
('Content-Length', str(len(response))),
|
|
('Content-Type', "text/plain"),
|
|
('Location', environ['STATIC_URL'] + environ['PATH_INFO']),
|
|
]
|
|
start_response('302 Found', headers)
|
|
return [response]
|
|
|
|
|
|
def set_config(environ):
|
|
lconfig = "%s/devtools/fakeweb.conf" % os.getcwd()
|
|
lconfigd = "%s/devtools/fakeweb.conf.d" % os.getcwd()
|
|
if os.path.exists(lconfig) or os.path.exists(lconfigd):
|
|
environ['koji.web.ConfigFile'] = lconfig
|
|
environ['koji.web.ConfigDir'] = lconfigd
|
|
|
|
|
|
def application(environ, start_response):
|
|
global FIRST
|
|
setup_testing_defaults(environ)
|
|
# provide some needed info
|
|
environ['SCRIPT_FILENAME'] = wsgi_publisher.__file__
|
|
environ['REQUEST_URI'] = get_url(environ)
|
|
environ['REQUEST_SCHEME'] = environ['wsgi.url_scheme']
|
|
set_config(environ)
|
|
if FIRST:
|
|
pprint.pprint(environ)
|
|
FIRST = False
|
|
path = environ.get('PATH_INFO', '')
|
|
if path.startswith('/koji-static'):
|
|
return do_static(environ, start_response)
|
|
return wsgi_publisher.application(environ, start_response)
|
|
|
|
|
|
def nice_literal(value):
|
|
try:
|
|
return ast.literal_eval(value)
|
|
except (ValueError, SyntaxError):
|
|
return value
|
|
|
|
|
|
def get_options():
|
|
parser = optparse.OptionParser(usage='%prog [options]')
|
|
# parser.add_option('--pdb', action='store_true',
|
|
# help='drop into pdb on error')
|
|
parser.add_option('--user', '-u', help='fake login as user')
|
|
parser.add_option('-o', '--config-option', help='override config option',
|
|
action='append', metavar='NAME=VALUE')
|
|
opts, args = parser.parse_args()
|
|
|
|
if args:
|
|
parser.error('This command takes no args, just options')
|
|
|
|
if opts.config_option:
|
|
overrides = {}
|
|
for s in opts.config_option:
|
|
k, v = s.split('=', 1)
|
|
v = nice_literal(v)
|
|
overrides[k] = v
|
|
opts.config_option = overrides
|
|
|
|
return opts
|
|
|
|
|
|
def override_load_config(opts):
|
|
original_load_config = wsgi_publisher.Dispatcher.load_config
|
|
|
|
def my_load_config(_self, environ):
|
|
oldopts = original_load_config(_self, environ)
|
|
oldopts.update(opts)
|
|
_self.options = oldopts
|
|
return oldopts
|
|
|
|
wsgi_publisher.Dispatcher.load_config = my_load_config
|
|
|
|
|
|
def fake_login(user):
|
|
original_assertLogin = kojiweb_handlers._assertLogin
|
|
original_getServer = kojiweb_handlers._getServer
|
|
|
|
def my_assertLogin(environ):
|
|
pass
|
|
|
|
def my_getServer(environ):
|
|
session = original_getServer(environ)
|
|
environ['koji.currentUser'] = session.getUser(user)
|
|
return session
|
|
|
|
kojiweb_handlers._assertLogin = my_assertLogin
|
|
kojiweb_handlers._getServer = my_getServer
|
|
|
|
|
|
def main():
|
|
options = get_options()
|
|
if options.config_option:
|
|
override_load_config(options.config_option)
|
|
if options.user:
|
|
fake_login(options.user)
|
|
# koji.add_file_logger('koji', 'fakeweb.log')
|
|
httpd = make_server('', 8000, application)
|
|
print("Serving kojiweb on http://localhost:8000 ...")
|
|
httpd.serve_forever()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|