From 46b15774bd718543206c77b758a68c9f130ad4b8 Mon Sep 17 00:00:00 2001 From: Mike McLean Date: Thu, 2 Mar 2023 14:21:42 -0500 Subject: [PATCH] decode option in RawHeader, handle integer counts --- koji/__init__.py | 71 ++++++++++++++++++++++++++++++++---------------- 1 file changed, 48 insertions(+), 23 deletions(-) diff --git a/koji/__init__.py b/koji/__init__.py index 4dcfecc0..a70cf07d 100644 --- a/koji/__init__.py +++ b/koji/__init__.py @@ -669,11 +669,12 @@ class RawHeader(object): # see Maximum RPM Appendix A: Format of the RPM File - def __init__(self, data): + def __init__(self, data, decode=False): if data[0:3] != RPM_HEADER_MAGIC: raise GenericError("Invalid rpm header: bad magic: %r" % (data[0:3],)) self.header = data self._index() + self.decode = decode def version(self): # fourth byte is the version @@ -803,12 +804,12 @@ class RawHeader(object): elif dtype == 6: # string (null terminated) end = self.header.find(six.b('\0'), pos) + value = self.header[pos:end] try: - print("String(%d): %r" % (end - pos, _decode_item(self.header[pos:end]))) - except ValueError: + value = _decode_item(value) + except Exception: print('INVALID STRING') - print("String(%d): %r" % (end - pos, self.header[pos:end])) - raise + print("String(%d): %r" % (end - pos, value)) expected_ofs = end + 1 elif dtype == 7: print("Data: %s" % hex_string(self.header[pos:pos + count])) @@ -817,18 +818,24 @@ class RawHeader(object): # string array for i in range(count): end = self.header.find(six.b('\0'), pos) - print("String(%d): %r" % (end - pos, self.header[pos:end])) + value = self.header[pos:end] + try: + value = _decode_item(value) + except Exception: + print('INVALID STRING') + print("String(%d): %r" % (end - pos, value)) pos = end + 1 expected_ofs = pos elif dtype == 9: - # unicode string array + # i18n string array for i in range(count): end = self.header.find(six.b('\0'), pos) + value = self.header[pos:end] try: - print("i18n(%d): %r" % (end - pos, _decode_item(self.header[pos:end]))) + value = _decode_item(value) except Exception: print('INVALID STRING') - print("i18n(%d): %r" % (end - pos, self.header[pos:end])) + print("i18n(%d): %r" % (end - pos, value)) pos = end + 1 expected_ofs = pos else: @@ -852,20 +859,32 @@ class RawHeader(object): assert tag == key return self._getitem(dtype, offset, count) - def _getitem(self, dtype, offset, count): + def _getitem(self, dtype, offset, count, decode=None): + if decode is None: + decode = self.decode # calculate start of store il = len(self.index) store = 16 + il * 16 pos = store + offset if dtype >= 2 and dtype <= 5: - n = 1 << (dtype - 2) - # n-byte integer - data = [_ord(x) for x in self.header[pos:pos + n]] - return multibyte(data) + values = [] + for _ in range(count): + n = 1 << (dtype - 2) + # n-byte integer + data = [_ord(x) for x in self.header[pos:pos + n]] + values.append(multibyte(data)) + pos += n + return values + elif dtype == 1: + # char treated like int8 + return [_ord(c) for c in self.header[pos:pos + count]] elif dtype == 6: # string (null terminated) - end = self.header.find('\0', pos) - return self.header[pos:end] + end = self.header.find(six.b('\0'), pos) + value = self.header[pos:end] + if decode: + value = _decode_item(value) + return value elif dtype == 7: # raw data return self.header[pos:pos + count] @@ -874,27 +893,33 @@ class RawHeader(object): result = [] for _ in range(count): end = self.header.find(six.b('\0'), pos) - result.append(self.header[pos:end]) + value = self.header[pos:end] + if decode: + value = _decode_item(value) + result.append(value) pos = end + 1 return result elif dtype == 9: - # unicode string array + # i18n string array + # note that we do not apply localization result = [] for _ in range(count): end = self.header.find(six.b('\0'), pos) - result.append(_decode_item(self.header[pos:end])) + value = self.header[pos:end] + if decode: + value = _decode_item(value) + result.append(value) pos = end + 1 return result else: - # XXX - not all valid data types are handled - raise GenericError("Unable to read header data type: %x" % dtype) + raise GenericError("Unknown header data type: %x" % dtype) - def get(self, key, default=None): + def get(self, key, default=None, decode=None): entry = self.index.get(key) if entry is None: return default else: - return self._getitem(*entry[1:]) + return self._getitem(*entry[1:], decode=decode) def rip_rpm_sighdr(src):