PR#403 fixEncoding for changelogs

Merges #403
https://pagure.io/koji/pull-request/403

Fixes #349
https://pagure.io/koji/issue/349
This commit is contained in:
Mike McLean 2017-05-17 16:25:29 -04:00
commit a0941767a7
3 changed files with 52 additions and 17 deletions

View file

@ -9376,7 +9376,8 @@ class RootExports(object):
else:
results.append({'date': cldate, 'date_ts': cltime, 'author': clname, 'text': cltext})
return _applyQueryOpts(results, queryOpts)
results = _applyQueryOpts(results, queryOpts)
return koji.fixEncodingRecurse(results, remove_nonprintable=True)
def cancelBuild(self, buildID):
"""Cancel the build with the given buildID
@ -9884,7 +9885,7 @@ class RootExports(object):
headers = koji.get_header_fields(rpm_path, headers)
for key, value in headers.items():
if isinstance(value, basestring):
headers[key] = koji.fixEncoding(value)
headers[key] = koji.fixEncoding(value, remove_nonprintable=True)
return headers
queryRPMSigs = staticmethod(query_rpm_sigs)

View file

@ -2894,7 +2894,13 @@ def _taskLabel(taskInfo):
else:
return '%s (%s)' % (method, arch)
def fixEncoding(value, fallback='iso8859-15'):
CONTROL_CHARS = [chr(i) for i in range(32)]
NONPRINTABLE_CHARS = ''.join([c for c in CONTROL_CHARS if c not in '\r\n\t'])
def removeNonprintable(value):
# expects raw-encoded string, not unicode
return value.translate(None, NONPRINTABLE_CHARS)
def fixEncoding(value, fallback='iso8859-15', remove_nonprintable=False):
"""
Convert value to a 'str' object encoded as UTF-8.
If value is not valid UTF-8 to begin with, assume it is
@ -2906,43 +2912,54 @@ def fixEncoding(value, fallback='iso8859-15'):
if isinstance(value, unicode):
# value is already unicode, so just convert it
# to a utf8-encoded str
return value.encode('utf8')
s = value.encode('utf8')
else:
# value is a str, but may be encoded in utf8 or some
# other non-ascii charset. Try to verify it's utf8, and if not,
# decode it using the fallback encoding.
try:
return value.decode('utf8').encode('utf8')
s = value.decode('utf8').encode('utf8')
except UnicodeDecodeError:
return value.decode(fallback).encode('utf8')
s = value.decode(fallback).encode('utf8')
if remove_nonprintable:
return removeNonprintable(s)
else:
return s
def fixEncodingRecurse(value, fallback='iso8859-15'):
def fixEncodingRecurse(value, fallback='iso8859-15', remove_nonprintable=False):
"""Recursively fix string encoding in an object
Similar behavior to fixEncoding, but recursive
"""
if isinstance(value, tuple):
return tuple([fixEncodingRecurse(x) for x in value])
return tuple([fixEncodingRecurse(x, fallback=fallback, remove_nonprintable=remove_nonprintable) for x in value])
elif isinstance(value, list):
return list([fixEncodingRecurse(x) for x in value])
return list([fixEncodingRecurse(x, fallback=fallback, remove_nonprintable=remove_nonprintable) for x in value])
elif isinstance(value, dict):
ret = {}
for k in value:
v = fixEncodingRecurse(value[k])
k = fixEncodingRecurse(k)
v = fixEncodingRecurse(value[k], fallback=fallback, remove_nonprintable=remove_nonprintable)
k = fixEncodingRecurse(k, fallback=fallback, remove_nonprintable=remove_nonprintable)
ret[k] = v
return ret
elif isinstance(value, unicode):
return value.encode('utf8')
if remove_nonprintable:
return removeNonprintable(value.encode('utf8'))
else:
return value.encode('utf8')
elif isinstance(value, str):
# value is a str, but may be encoded in utf8 or some
# other non-ascii charset. Try to verify it's utf8, and if not,
# decode it using the fallback encoding.
try:
return value.decode('utf8').encode('utf8')
except UnicodeDecodeError, err:
return value.decode(fallback).encode('utf8')
s = value.decode('utf8').encode('utf8')
except UnicodeDecodeError:
s = value.decode(fallback).encode('utf8')
if remove_nonprintable:
return removeNonprintable(s)
else:
return s
else:
return value

View file

@ -36,6 +36,11 @@ class FixEncodingTestCase(unittest.TestCase):
"""Test the fixEncoding function"""
for a, b in self.simple_values:
self.assertEqual(koji.fixEncoding(a), b)
self.assertEqual(koji.fixEncoding(b), b)
c = a.encode('utf16')
self.assertEqual(koji.fixEncoding(c, fallback='utf16'), b)
d = a[:-3] + u'\x00\x01' + a[-3:]
self.assertEqual(koji.fixEncoding(d, remove_nonprintable=True), b)
complex_values = [
# [ value, fixed ]
@ -43,8 +48,18 @@ class FixEncodingTestCase(unittest.TestCase):
[(), ()],
[None, None],
[[], []],
[{u'a': 'a' , 'b' : {'c': u'c'}},
{ 'a': 'a' , 'b' : {'c': 'c'}}],
[{u'a': 'a' , 'b' : {'c': u'c\x00'}},
{ 'a': 'a' , 'b' : {'c': 'c\x00'}}],
# iso8859-15 fallback
['g\xf3\xf0an daginn', 'g\xc3\xb3\xc3\xb0an daginn'],
]
nonprint = [
['hello\0world\0', 'helloworld'],
[u'hello\0world\0', 'helloworld'],
[[u'hello\0world\0'], ['helloworld']],
[{0: u'hello\0world\0'}, {0: 'helloworld'}],
[[{0: u'hello\0world\0'}], [{0: 'helloworld'}]],
]
def test_fixEncodingRecurse(self):
@ -53,6 +68,8 @@ class FixEncodingTestCase(unittest.TestCase):
self.assertEqual(koji.fixEncoding(a), b)
for a, b in self.complex_values:
self.assertEqual(koji.fixEncodingRecurse(a), b)
for a, b in self.nonprint:
self.assertEqual(koji.fixEncodingRecurse(a, remove_nonprintable=True), b)
if __name__ == '__main__':