PR#4251: drop cgi import

Merges #4251
https://pagure.io/koji/pull-request/4251

Fixes: #3966
https://pagure.io/koji/issue/3966
python cgi module will be dropped in 3.13
This commit is contained in:
Mike McLean 2024-10-31 09:39:58 -04:00
commit ac22ca9a3b
12 changed files with 122 additions and 167 deletions

View file

@ -111,6 +111,7 @@ def application(environ, start_response):
# provide some needed info
environ['SCRIPT_FILENAME'] = wsgi_publisher.__file__
environ['REQUEST_URI'] = get_url(environ)
environ['REQUEST_SCHEME'] = environ['wsgi.url_scheme']
set_config(environ)
if FIRST:
pprint.pprint(environ)

View file

@ -1,11 +1,10 @@
from unittest import mock
import unittest
import cgi
import koji
from io import BytesIO
from .loadwebindex import webidx
from koji.server import ServerRedirect
from kojiweb.util import FieldStorageCompat
class TestBuildTargetCreate(unittest.TestCase):
@ -28,20 +27,12 @@ class TestBuildTargetCreate(unittest.TestCase):
def tearDown(self):
mock.patch.stopall()
def get_fs(self, urlencode_data):
urlencode_environ = {
'CONTENT_LENGTH': str(len(urlencode_data)),
'CONTENT_TYPE': 'application/x-www-form-urlencoded',
'QUERY_STRING': '',
'REQUEST_METHOD': 'POST',
}
data = BytesIO(urlencode_data)
data.seek(0)
return cgi.FieldStorage(fp=data, environ=urlencode_environ)
def get_fs(self, query):
return FieldStorageCompat({'QUERY_STRING': query})
def test_buildtargetcreate_add_case_exception(self):
"""Test buildtargetcreate function raises exception when buildtarget is not created."""
urlencode_data = b"add=True&name=testname&buildTag=11&destTag=99"
urlencode_data = "add=True&name=testname&buildTag=11&destTag=99"
fs = self.get_fs(urlencode_data)
def __get_server(env):
@ -63,7 +54,7 @@ class TestBuildTargetCreate(unittest.TestCase):
def test_buildtargetcreate_add_case_valid(self):
"""Test buildtargetcreate function valid case (add)."""
urlencode_data = b"add=True&name=testname&buildTag=11&destTag=99"
urlencode_data = "add=True&name=testname&buildTag=11&destTag=99"
fs = self.get_fs(urlencode_data)
def __get_server(env):
@ -85,7 +76,7 @@ class TestBuildTargetCreate(unittest.TestCase):
def test_buildtargetcreate_cancel_case(self):
"""Test buildtargetcreate function valid case (cancel)."""
urlencode_data = b"cancel=True"
urlencode_data = "cancel=True"
fs = self.get_fs(urlencode_data)
def __get_server(env):
@ -104,7 +95,7 @@ class TestBuildTargetCreate(unittest.TestCase):
def test_buildtargetcreate_another_case(self):
"""Test buildtargetcreate function valid case (another)."""
urlencode_data = b"another=True"
urlencode_data = "another=True"
fs = self.get_fs(urlencode_data)
def __get_server(env):

View file

@ -1,11 +1,10 @@
from unittest import mock
import unittest
import cgi
import koji
from io import BytesIO
from .loadwebindex import webidx
from koji.server import ServerRedirect
from kojiweb.util import FieldStorageCompat
class TestBuildTargetEdit(unittest.TestCase):
@ -28,20 +27,12 @@ class TestBuildTargetEdit(unittest.TestCase):
def tearDown(self):
mock.patch.stopall()
def get_fs(self, urlencode_data):
urlencode_environ = {
'CONTENT_LENGTH': str(len(urlencode_data)),
'CONTENT_TYPE': 'application/x-www-form-urlencoded',
'QUERY_STRING': '',
'REQUEST_METHOD': 'POST',
}
data = BytesIO(urlencode_data)
data.seek(0)
return cgi.FieldStorage(fp=data, environ=urlencode_environ)
def get_fs(self, query):
return FieldStorageCompat({'QUERY_STRING': query})
def test_buildtargetedit_exception(self):
"""Test buildtargetedit function raises exception when build target not exists."""
urlencode_data = b"save=True&name=testname&buildTag=11&destTag=99"
urlencode_data = "save=True&name=testname&buildTag=11&destTag=99"
fs = self.get_fs(urlencode_data)
def __get_server(env):
@ -63,7 +54,7 @@ class TestBuildTargetEdit(unittest.TestCase):
def test_buildtargetedit_exception_build_tag(self):
"""Test buildtargetedit function raises exception when build tag not exists."""
urlencode_data = b"save=True&name=testname&buildTag=11&destTag=99"
urlencode_data = "save=True&name=testname&buildTag=11&destTag=99"
fs = self.get_fs(urlencode_data)
def __get_server(env):
@ -86,7 +77,7 @@ class TestBuildTargetEdit(unittest.TestCase):
def test_buildtargetedit_exception_dest_tag(self):
"""Test buildtargetedit function raises exception when destination tag not exists."""
urlencode_data = b"save=True&name=testname&buildTag=11&destTag=99"
urlencode_data = "save=True&name=testname&buildTag=11&destTag=99"
fs = self.get_fs(urlencode_data)
def __get_server(env):
@ -110,7 +101,7 @@ class TestBuildTargetEdit(unittest.TestCase):
def test_buildtargetedit_save_case_valid(self):
"""Test buildtargetedit function valid case (save)."""
urlencode_data = b"save=True&name=testname&buildTag=11&destTag=99"
urlencode_data = "save=True&name=testname&buildTag=11&destTag=99"
fs = self.get_fs(urlencode_data)
def __get_server(env):
@ -140,7 +131,7 @@ class TestBuildTargetEdit(unittest.TestCase):
def test_buildtargetedit_cancel_case(self):
"""Test buildtargetedit function valid case (cancel)."""
self.server.getBuildTarget.return_value = {'id': int(self.buildtarget_id)}
urlencode_data = b"cancel=True"
urlencode_data = "cancel=True"
fs = self.get_fs(urlencode_data)
def __get_server(env):
@ -161,7 +152,7 @@ class TestBuildTargetEdit(unittest.TestCase):
def test_buildtargetedit_another_case(self):
"""Test buildtargetedit function valid case (another)."""
urlencode_data = b"another=True"
urlencode_data = "another=True"
fs = self.get_fs(urlencode_data)
self.server.getBuildTarget.return_value = {'id': int(self.buildtarget_id)}

View file

@ -1,11 +1,10 @@
import unittest
import koji
import cgi
from unittest import mock
from io import BytesIO
from .loadwebindex import webidx
from koji.server import ServerRedirect
from kojiweb.util import FieldStorageCompat
class TestHostEdit(unittest.TestCase):
@ -27,16 +26,8 @@ class TestHostEdit(unittest.TestCase):
def tearDown(self):
mock.patch.stopall()
def get_fs(self, urlencode_data):
urlencode_environ = {
'CONTENT_LENGTH': str(len(urlencode_data)),
'CONTENT_TYPE': 'application/x-www-form-urlencoded',
'QUERY_STRING': '',
'REQUEST_METHOD': 'POST',
}
data = BytesIO(urlencode_data)
data.seek(0)
return cgi.FieldStorage(fp=data, environ=urlencode_environ)
def get_fs(self, query):
return FieldStorageCompat({'QUERY_STRING': query})
def test_hostedit_exception_host_not_existing(self):
"""Test hostedit function raises exception when host ID not exists."""
@ -55,8 +46,8 @@ class TestHostEdit(unittest.TestCase):
def test_hostedit_save_case_valid(self):
"""Test hostedit function valid case (save)."""
urlencode_data = b"save=True&arches=x86_64&capacity=1.0&description=test-desc&" \
b"comment=test-comment&enable=True&channels=default"
urlencode_data = "save=True&arches=x86_64&capacity=1.0&description=test-desc&" \
"comment=test-comment&enable=True&channels=default"
fs = self.get_fs(urlencode_data)
self.server.getHost.return_value = self.host_info
self.server.editHost.return_value = True
@ -84,7 +75,7 @@ class TestHostEdit(unittest.TestCase):
def test_hostedit_cancel_case_valid(self):
"""Test hostedit function valid case (cancel)."""
urlencode_data = b"cancel=True"
urlencode_data = "cancel=True"
fs = self.get_fs(urlencode_data)
def __get_server(env):
@ -107,7 +98,7 @@ class TestHostEdit(unittest.TestCase):
def test_hostedit_another_case(self):
"""Test hostedit function valid case (another)."""
urlencode_data = b"another=True"
urlencode_data = "another=True"
fs = self.get_fs(urlencode_data)
def __get_server(env):

View file

@ -1,11 +1,10 @@
from unittest import mock
import unittest
import cgi
import koji
from io import BytesIO
from .loadwebindex import webidx
from koji.server import ServerRedirect
from kojiweb.util import FieldStorageCompat
class TestNotificationCreate(unittest.TestCase):
@ -27,16 +26,8 @@ class TestNotificationCreate(unittest.TestCase):
def tearDown(self):
mock.patch.stopall()
def get_fs(self, urlencode_data):
urlencode_environ = {
'CONTENT_LENGTH': str(len(urlencode_data)),
'CONTENT_TYPE': 'application/x-www-form-urlencoded',
'QUERY_STRING': '',
'REQUEST_METHOD': 'POST',
}
data = BytesIO(urlencode_data)
data.seek(0)
return cgi.FieldStorage(fp=data, environ=urlencode_environ)
def get_fs(self, query):
return FieldStorageCompat({'QUERY_STRING': query})
def test_notificationcreate_add_case_not_logged(self):
"""Test notificationcreate function raises exception when user is not logged."""
@ -47,7 +38,7 @@ class TestNotificationCreate(unittest.TestCase):
},
'koji.currentUser': None,
}
urlencode_data = b"add=True&package=2&tag=11"
urlencode_data = "add=True&package=2&tag=11"
fs = self.get_fs(urlencode_data)
def __get_server(env):
@ -66,7 +57,7 @@ class TestNotificationCreate(unittest.TestCase):
def test_notificationcreate_add_case_int(self):
"""Test notificationcreate function valid case (add)"""
urlencode_data = b"add=True&package=2&tag=11&success_only=True"
urlencode_data = "add=True&package=2&tag=11&success_only=True"
fs = self.get_fs(urlencode_data)
def __get_server(env):
@ -87,7 +78,7 @@ class TestNotificationCreate(unittest.TestCase):
def test_notificationcreate_add_case_all(self):
"""Test notificationcreate function valid case (add)"""
urlencode_data = b"add=True&package=all&tag=all"
urlencode_data = "add=True&package=all&tag=all"
fs = self.get_fs(urlencode_data)
def __get_server(env):
@ -107,7 +98,7 @@ class TestNotificationCreate(unittest.TestCase):
def test_notificationcreate_cancel_case(self):
"""Test notificationcreate function valid case (cancel)."""
urlencode_data = b"cancel=True"
urlencode_data = "cancel=True"
fs = self.get_fs(urlencode_data)
def __get_server(env):
@ -126,7 +117,7 @@ class TestNotificationCreate(unittest.TestCase):
def test_notificationcreate_another_case(self):
"""Test notificationcreate function valid case (another)."""
urlencode_data = b"another=True"
urlencode_data = "another=True"
fs = self.get_fs(urlencode_data)
def __get_server(env):

View file

@ -1,11 +1,10 @@
import unittest
import koji
import cgi
from unittest import mock
from io import BytesIO
from .loadwebindex import webidx
from koji.server import ServerRedirect
from kojiweb.util import FieldStorageCompat
class TestNotificationEdit(unittest.TestCase):
@ -28,16 +27,8 @@ class TestNotificationEdit(unittest.TestCase):
def tearDown(self):
mock.patch.stopall()
def get_fs(self, urlencode_data):
urlencode_environ = {
'CONTENT_LENGTH': str(len(urlencode_data)),
'CONTENT_TYPE': 'application/x-www-form-urlencoded',
'QUERY_STRING': '',
'REQUEST_METHOD': 'POST',
}
data = BytesIO(urlencode_data)
data.seek(0)
return cgi.FieldStorage(fp=data, environ=urlencode_environ)
def get_fs(self, query):
return FieldStorageCompat({'QUERY_STRING': query})
def test_notificationedit_exception(self):
"""Test notificationedit function raises exception when notification ID is not exists."""
@ -54,7 +45,7 @@ class TestNotificationEdit(unittest.TestCase):
def test_notificationedit_save_case_int(self):
"""Test notificationedit function valid case (save)."""
urlencode_data = b"save=True&package=2&tag=11&success_only=True"
urlencode_data = "save=True&package=2&tag=11&success_only=True"
fs = self.get_fs(urlencode_data)
def __get_server(env):
@ -77,7 +68,7 @@ class TestNotificationEdit(unittest.TestCase):
def test_notificationedit_save_case_all(self):
"""Test notificationedit function valid case (all)."""
urlencode_data = b"save=True&package=all&tag=all"
urlencode_data = "save=True&package=all&tag=all"
fs = self.get_fs(urlencode_data)
def __get_server(env):
@ -100,7 +91,7 @@ class TestNotificationEdit(unittest.TestCase):
def test_notificationedit_cancel_case(self):
"""Test notificationedit function valid case (cancel)."""
urlencode_data = b"cancel=True"
urlencode_data = "cancel=True"
fs = self.get_fs(urlencode_data)
def __get_server(env):
@ -121,7 +112,7 @@ class TestNotificationEdit(unittest.TestCase):
def test_notificationedit_another_case(self):
"""Test notificationedit function valid case (another)."""
urlencode_data = b"another=True"
urlencode_data = "another=True"
fs = self.get_fs(urlencode_data)
def __get_server(env):

View file

@ -1,10 +1,9 @@
from unittest import mock
import unittest
import cgi
import koji
from io import BytesIO
from .loadwebindex import webidx
from kojiweb.util import FieldStorageCompat
class TestSearch(unittest.TestCase):
@ -18,16 +17,8 @@ class TestSearch(unittest.TestCase):
},
'koji.currentUser': None,
}
urlencode_data = b"terms=test&type=package&match=testmatch"
urlencode_environ = {
'CONTENT_LENGTH': str(len(urlencode_data)),
'CONTENT_TYPE': 'application/x-www-form-urlencoded',
'QUERY_STRING': '',
'REQUEST_METHOD': 'POST',
}
data = BytesIO(urlencode_data)
data.seek(0)
self.fs = cgi.FieldStorage(fp=data, environ=urlencode_environ)
urlencode_data = "terms=test&type=package&match=testmatch"
self.fs = FieldStorageCompat({'QUERY_STRING': urlencode_data})
def __get_server(env):
env['koji.form'] = self.fs

View file

@ -1,10 +1,9 @@
import unittest
import cgi
from unittest import mock
from io import BytesIO
from .loadwebindex import webidx
from koji.server import ServerRedirect
from kojiweb.util import FieldStorageCompat
class TestTagCreate(unittest.TestCase):
@ -24,21 +23,13 @@ class TestTagCreate(unittest.TestCase):
def tearDown(self):
mock.patch.stopall()
def get_fs(self, urlencode_data):
urlencode_environ = {
'CONTENT_LENGTH': str(len(urlencode_data)),
'CONTENT_TYPE': 'application/x-www-form-urlencoded',
'QUERY_STRING': '',
'REQUEST_METHOD': 'POST',
}
data = BytesIO(urlencode_data)
data.seek(0)
return cgi.FieldStorage(fp=data, environ=urlencode_environ)
def get_fs(self, query):
return FieldStorageCompat({'QUERY_STRING': query})
def test_tagcreate_add_case_valid(self):
"""Test tagcreate function valid case (add)"""
urlencode_data = b"add=True&name=testname&arches=x86_64&locked=True&permission=1" \
b"&maven_support=True&maven_include_all=True"
urlencode_data = "add=True&name=testname&arches=x86_64&locked=True&permission=1" \
"&maven_support=True&maven_include_all=True"
fs = self.get_fs(urlencode_data)
def __get_server(env):
@ -60,7 +51,7 @@ class TestTagCreate(unittest.TestCase):
def test_tagcreate_cancel_case_valid(self):
"""Test tagcreate function valid cases (cancel)."""
urlencode_data = b"cancel=True"
urlencode_data = "cancel=True"
fs = self.get_fs(urlencode_data)
def __get_server(env):
@ -79,7 +70,7 @@ class TestTagCreate(unittest.TestCase):
def test_tagedit_another_case_valid(self):
"""Test tagedit function valid case (another)."""
urlencode_data = b"another=True"
urlencode_data = "another=True"
fs = self.get_fs(urlencode_data)
def __get_server(env):

View file

@ -1,11 +1,10 @@
import unittest
import koji
import cgi
from unittest import mock
from io import BytesIO
from .loadwebindex import webidx
from koji.server import ServerRedirect
from kojiweb.util import FieldStorageCompat
class TestTagEdit(unittest.TestCase):
@ -26,16 +25,9 @@ class TestTagEdit(unittest.TestCase):
def tearDown(self):
mock.patch.stopall()
def get_fs(self, urlencode_data):
urlencode_environ = {
'CONTENT_LENGTH': str(len(urlencode_data)),
'CONTENT_TYPE': 'application/x-www-form-urlencoded',
'QUERY_STRING': '',
'REQUEST_METHOD': 'POST',
}
data = BytesIO(urlencode_data)
data.seek(0)
return cgi.FieldStorage(fp=data, environ=urlencode_environ)
def get_fs(self, query):
environ = {'QUERY_STRING': query}
return FieldStorageCompat(environ)
def test_tagedit_exception(self):
"""Test tagedit function raises exception when tag ID not exists."""
@ -52,8 +44,8 @@ class TestTagEdit(unittest.TestCase):
def test_tagedit_add_case_valid(self):
"""Test tagedit function valid case (save)."""
urlencode_data = b"save=True&name=testname&arches=x86_64&locked=True&permission=1" \
b"&maven_support=True&maven_include_all=True"
urlencode_data = "save=True&name=testname&arches=x86_64&locked=True&permission=1" \
"&maven_support=True&maven_include_all=True"
fs = self.get_fs(urlencode_data)
def __get_server(env):
@ -77,7 +69,7 @@ class TestTagEdit(unittest.TestCase):
def test_tagedit_cancel_case_valid(self):
"""Test tagedit function valid case (cancel)."""
urlencode_data = b"cancel=True"
urlencode_data = "cancel=True"
fs = self.get_fs(urlencode_data)
def __get_server(env):
@ -99,7 +91,7 @@ class TestTagEdit(unittest.TestCase):
def test_tagedit_another_case_valid(self):
"""Test tagedit function valid case (another)."""
urlencode_data = b"another=True"
urlencode_data = "another=True"
fs = self.get_fs(urlencode_data)
def __get_server(env):

View file

@ -1,12 +1,11 @@
from __future__ import absolute_import
import unittest
from unittest import mock
import cgi
import koji
from io import BytesIO
from koji.server import ServerRedirect
from .loadwebindex import webidx
from kojiweb.util import FieldStorageCompat
class TestActiveSessionDelete(unittest.TestCase):
@ -33,16 +32,8 @@ class TestActiveSessionDelete(unittest.TestCase):
def tearDown(self):
mock.patch.stopall()
def get_fs(self, urlencode_data):
urlencode_environ = {
'CONTENT_LENGTH': str(len(urlencode_data)),
'CONTENT_TYPE': 'application/x-www-form-urlencoded',
'QUERY_STRING': '',
'REQUEST_METHOD': 'POST',
}
data = BytesIO(urlencode_data)
data.seek(0)
return cgi.FieldStorage(fp=data, environ=urlencode_environ)
def get_fs(self, query):
return FieldStorageCompat({'QUERY_STRING': query})
def test_tagparent_remove(self):
"""Test tagparent function with remove action."""
@ -103,8 +94,8 @@ class TestActiveSessionDelete(unittest.TestCase):
parent_id = 123
action = 'add'
data = [{'parent_id': 111}]
urlencode_data = (b"add=true&priority=10&maxdepth=5&pkg_filter=pkg_filter&"
b"intransitive=true&noconfig=false")
urlencode_data = ("add=true&priority=10&maxdepth=5&pkg_filter=pkg_filter&"
"intransitive=true&noconfig=false")
fs = self.get_fs(urlencode_data)
def __get_server(env):
@ -130,7 +121,7 @@ class TestActiveSessionDelete(unittest.TestCase):
tag_id = 456
parent_id = 123
action = 'add'
urlencode_data = b"cancel=true"
urlencode_data = "cancel=true"
fs = self.get_fs(urlencode_data)
def __get_server(env):
@ -154,7 +145,7 @@ class TestActiveSessionDelete(unittest.TestCase):
action = 'add'
data = [{'parent_id': 111, 'priority': 1}]
urlencode_data = b"edit=true"
urlencode_data = "edit=true"
fs = self.get_fs(urlencode_data)
def __get_server(env):
@ -177,7 +168,7 @@ class TestActiveSessionDelete(unittest.TestCase):
action = 'add'
data = [{'parent_id': 123, 'priority': 1}]
urlencode_data = b"edit=true"
urlencode_data = "edit=true"
fs = self.get_fs(urlencode_data)
def __get_server(env):
@ -202,7 +193,7 @@ class TestActiveSessionDelete(unittest.TestCase):
{'parent_id': 123, 'priority': 2},
{'parent_id': 123, 'priority': 3}]
urlencode_data = b"edit=true"
urlencode_data = "edit=true"
fs = self.get_fs(urlencode_data)
def __get_server(env):

View file

@ -19,7 +19,6 @@
# Authors:
# Mike McLean <mikem@redhat.com>
import cgi
import inspect
import logging
import os.path
@ -241,29 +240,14 @@ class Dispatcher(object):
func = self.handler_index.get(method)
if not func:
raise URLNotFound
# parse form args
data = {}
fs = cgi.FieldStorage(fp=environ['wsgi.input'],
environ=environ.copy(),
keep_blank_values=True)
for field in fs.list:
if field.filename:
val = field
else:
val = field.value
data.setdefault(field.name, []).append(val)
# replace singleton lists with single values
# XXX - this is a bad practice, but for now we strive to emulate mod_python.publisher
for arg in data:
val = data[arg]
if isinstance(val, list) and len(val) == 1:
data[arg] = val[0]
# parse url args
fs = kojiweb.util.FieldStorageCompat(environ)
environ['koji.form'] = fs
args, varargs, varkw, defaults, kwonlyargs, kwonlydefaults, ann = \
inspect.getfullargspec(func)
if not varkw:
# remove any unexpected args
data = dslice(data, args, strict=False)
data = dslice(fs.data, args, strict=False)
# TODO (warning in header or something?)
return func, data

View file

@ -26,10 +26,11 @@ import re
import ssl
import stat
import urllib
# a bunch of exception classes that explainError needs
from socket import error as socket_error
from xml.parsers.expat import ExpatError
from collections.abc import Mapping
from functools import wraps
from socket import error as socket_error
from urllib.parse import parse_qs
from xml.parsers.expat import ExpatError
import Cheetah.Template
@ -206,6 +207,55 @@ def _getValidTokens(environ):
return tokens
class FieldStorageCompat(Mapping):
"""Emulate the parts of cgi.FieldStorage that we need"""
def __init__(self, environ):
data = parse_qs(environ.get('QUERY_STRING', ''), strict_parsing=True,
keep_blank_values=True)
# replace singleton lists with single values
for arg in data:
val = data[arg]
if isinstance(val, list) and len(val) == 1:
data[arg] = val[0]
self.data = data
# we need getitem, iter, and len for the Mapping inheritance to work
def __getitem__(self, key):
return FieldCompat(self.data[key])
def __iter__(self):
iter(self.data)
def __len__(self):
return len(self.data)
def getfirst(self, name, default=None):
"""Get first value from list entries"""
value = self.data.get(name, default)
if isinstance(value, (list, tuple)):
return value[0]
else:
return value
def getlist(self, name):
"""Get value, wrap in list if not already"""
if name not in self.data:
return []
value = self.data[name]
if isinstance(value, (list, tuple)):
return value
else:
return [value]
class FieldCompat:
def __init__(self, value):
self.value = value
def toggleOrder(template, sortKey, orderVar='order'):
"""
If orderVar equals 'sortKey', return '-sortKey', else