#!/usr/bin/python3 from __future__ import absolute_import, print_function import ast import mimetypes import os import os.path import optparse import pprint import sys from urllib.parse import quote from wsgiref.simple_server import make_server from wsgiref.util import setup_testing_defaults CWD = os.getcwd() sys.path.insert(0, CWD) sys.path.insert(1, os.path.join(CWD, 'www/lib')) sys.path.insert(1, os.path.join(CWD, 'www/kojiweb')) import wsgi_publisher import index as kojiweb_handlers def get_url(environ): url = environ['wsgi.url_scheme']+'://' if environ.get('HTTP_HOST'): url += environ['HTTP_HOST'] else: url += environ['SERVER_NAME'] if environ['wsgi.url_scheme'] == 'https': if environ['SERVER_PORT'] != '443': url += ':' + environ['SERVER_PORT'] else: if environ['SERVER_PORT'] != '80': url += ':' + environ['SERVER_PORT'] url += quote(environ.get('SCRIPT_NAME', '')) url += quote(environ.get('PATH_INFO', '')) if environ.get('QUERY_STRING'): url += '?' + environ['QUERY_STRING'] return url FIRST = True def do_static(environ, start_response): redirect = os.environ.get('STATIC_URL', '') if redirect: environ['STATIC_URL'] = redirect return redirect_static(environ, start_response) # otherwise serve our local static files path = environ.get('PATH_INFO', '') assert path.startswith('/koji-static') path = path[12:] path = path.lstrip('/') fn = os.path.join(CWD, 'www/static', path) if not os.path.exists(fn): print("No such file: %s" % fn) return do_404(environ, start_response) size = os.path.getsize(fn) ctype, encoding = mimetypes.guess_type(fn) headers = [ ('Content-Length', str(size)), ('Content-Type', ctype), ] start_response('200 OK', headers) return iter_file(fn) def do_404(environ, start_response): content = 'URL not found\n' headers = [ ('Content-Length', str(len(content))), ('Content-Type', 'text/plain'), ] start_response('404 Not Found', headers) return [content] def iter_file(fn): with open(fn, 'rb') as fo: while True: chunk = fo.read(8192) if not chunk: break yield chunk def redirect_static(environ, start_response): response = '' headers = [ ('Content-Length', str(len(response))), ('Content-Type', "text/plain"), ('Location', environ['STATIC_URL'] + environ['PATH_INFO']), ] start_response('302 Found', headers) return [response] def set_config(environ): lconfig = "%s/devtools/fakeweb.conf" % os.getcwd() lconfigd = "%s/devtools/fakeweb.conf.d" % os.getcwd() if os.path.exists(lconfig) or os.path.exists(lconfigd): environ['koji.web.ConfigFile'] = lconfig environ['koji.web.ConfigDir'] = lconfigd def application(environ, start_response): global FIRST setup_testing_defaults(environ) # provide some needed info environ['SCRIPT_FILENAME'] = wsgi_publisher.__file__ environ['REQUEST_URI'] = get_url(environ) environ['REQUEST_SCHEME'] = environ['wsgi.url_scheme'] set_config(environ) if FIRST: pprint.pprint(environ) FIRST = False path = environ.get('PATH_INFO', '') if path.startswith('/koji-static'): return do_static(environ, start_response) return wsgi_publisher.application(environ, start_response) def nice_literal(value): try: return ast.literal_eval(value) except (ValueError, SyntaxError): return value def get_options(): parser = optparse.OptionParser(usage='%prog [options]') # parser.add_option('--pdb', action='store_true', # help='drop into pdb on error') parser.add_option('--user', '-u', help='fake login as user') parser.add_option('--time', '-t', type="float", help='mock time as value') parser.add_option('-o', '--config-option', help='override config option', action='append', metavar='NAME=VALUE') opts, args = parser.parse_args() if args: parser.error('This command takes no args, just options') if opts.config_option: overrides = {} for s in opts.config_option: k, v = s.split('=', 1) v = nice_literal(v) overrides[k] = v opts.config_option = overrides return opts def override_load_config(opts): original_load_config = wsgi_publisher.Dispatcher.load_config def my_load_config(_self, environ): oldopts = original_load_config(_self, environ) oldopts.update(opts) _self.options = oldopts return oldopts wsgi_publisher.Dispatcher.load_config = my_load_config def fake_login(user): original_assertLogin = kojiweb_handlers._assertLogin original_getServer = kojiweb_handlers._getServer def my_assertLogin(environ): pass def my_getServer(environ): session = original_getServer(environ) environ['koji.currentUser'] = session.getUser(user) return session kojiweb_handlers._assertLogin = my_assertLogin kojiweb_handlers._getServer = my_getServer def main(): options = get_options() if options.config_option: override_load_config(options.config_option) if options.user: fake_login(options.user) if options.time: from unittest import mock import datetime dt = datetime.datetime.fromtimestamp(options.time) mock.patch('time.time', return_value=options.time).start() # mocking datetime is tricky, can't mock the c object directly # and can't mock the datetime class globally without breaking other code # so we just mock the handlers import of it my_datetime = mock.patch.object(kojiweb_handlers.kojiweb.util, 'datetime', wraps=datetime).start() my_datetime.datetime.now.return_value = dt # koji.add_file_logger('koji', 'fakeweb.log') httpd = make_server('', 8000, application) print("Serving kojiweb on http://localhost:8000 ...") httpd.serve_forever() if __name__ == '__main__': main()