diff --git a/koji/__init__.py b/koji/__init__.py index 64716f48..7785b7ec 100644 --- a/koji/__init__.py +++ b/koji/__init__.py @@ -669,13 +669,12 @@ class RawHeader(object): # see Maximum RPM Appendix A: Format of the RPM File - def __init__(self, data): - if rpm is None: - raise GenericError("rpm's python bindings are not installed") + def __init__(self, data, decode=False): if data[0:3] != RPM_HEADER_MAGIC: raise GenericError("Invalid rpm header: bad magic: %r" % (data[0:3],)) self.header = data self._index() + self.decode = decode def version(self): # fourth byte is the version @@ -703,7 +702,7 @@ class RawHeader(object): self.datalen = dl self.index = index - def dump(self): + def dump(self, sig=None): print("HEADER DUMP:") # calculate start of store il = len(self.index) @@ -714,35 +713,84 @@ class RawHeader(object): # sort entries by offset, dtype # also rearrange: tag, dtype, offset, count -> offset, dtype, tag, count order = sorted([(x[2], x[1], x[0], x[3]) for x in six.itervalues(self.index)]) - next = store # map some rpmtag codes tags = {} - for name, code in six.iteritems(rpm.__dict__): - if name.startswith('RPMTAG_') and isinstance(code, int): - tags[code] = name[7:].lower() + if rpm: + for name, code in six.iteritems(rpm.__dict__): + if name.startswith('RPMTAG_') and isinstance(code, int): + tags[code] = name[7:].lower() + else: + print("rpm's python bindings are not installed. Unable to convert tag codes") + if sig is None: + # detect whether this is a signature header + sig = bool(self.get(RPM_TAG_HEADERSIGNATURES)) + if sig: + print("Parsing as a signature header") + # signature headers have a few different values + # the SIGTAG_* values are not exposed in the python api + # see rpmtag.h + tags[1000] = 'size' + tags[1001] = 'lemd5_1' + tags[1002] = 'pgp' + tags[1003] = 'lemd5_2' + tags[1004] = 'md5' + tags[1005] = 'gpg' + tags[1006] = 'pgp5' + tags[1007] = 'payloadsize' + tags[1008] = 'reservedspace' + # expect first entry at start + expected_ofs = store for entry in order: # tag, dtype, offset, count = entry offset, dtype, tag, count = entry pos = store + offset - if next is not None: - if pos > next: + if expected_ofs is not None: + # expected_ofs will be None after an unrecognized data type + # integer types are byte aligned for their size + align = None + pad = 0 + if dtype == 3: # INT16 + align = 2 + elif dtype == 4: # INT32 + align = 4 + elif dtype == 5: # INT64 + align = 8 + if align: + pad = (align - (expected_ofs % align)) % align + expected_ofs += pad + if pos > expected_ofs: print("** HOLE between entries") - print("Hex: %s" % hex_string(self.header[next:pos])) - print("Data: %r" % self.header[next:pos]) - elif pos < next: + print("Size: %d" % (pos - expected_ofs)) + print("Hex: %s" % hex_string(self.header[expected_ofs:pos])) + print("Data: %r" % self.header[expected_ofs:pos]) + print("Padding: %i" % pad) + print("Expected offset: 0x%x" % (expected_ofs - store)) + elif pad and pos == expected_ofs - pad: + print("** Missing expected padding") + print("Padding: %i" % pad) + print("Expected offset: 0x%x" % (expected_ofs - store)) + elif pos < expected_ofs: print("** OVERLAPPING entries") - print("Tag: %d [%s], Type: %d, Offset: %x, Count: %d" + print("Overlap size: %d" % (expected_ofs - pos)) + print("Expected offset: 0x%x" % (expected_ofs - store)) + elif pad: + # pos == expected_ofs + print("Alignment padding: %i" % pad) + padbytes = self.header[pos - pad:pos] + if padbytes != b'\0' * pad: + print("NON-NULL padding bytes: %s" % hex_string(padbytes)) + print("Tag: %d [%s], Type: %d, Offset: 0x%x, Count: %d" % (tag, tags.get(tag, '?'), dtype, offset, count)) if dtype == 0: # null print("[NULL entry]") - next = pos + expected_ofs = pos elif dtype == 1: # char for i in range(count): print("Char: %r" % self.header[pos]) pos += 1 - next = pos + expected_ofs = pos elif dtype >= 2 and dtype <= 5: # integer n = 1 << (dtype - 2) @@ -752,98 +800,145 @@ class RawHeader(object): num = multibyte(data) print("Int(%d): %d" % (n, num)) pos += n - next = pos + expected_ofs = pos elif dtype == 6: # string (null terminated) end = self.header.find(six.b('\0'), pos) + value = self.header[pos:end] try: - print("String(%d): %r" % (end - pos, _decode_item(self.header[pos:end]))) - except ValueError: + value = self.decode_bytes(value) + except Exception: print('INVALID STRING') - print("String(%d): %r" % (end - pos, self.header[pos:end])) - raise - next = end + 1 + print("String(%d): %r" % (end - pos, value)) + expected_ofs = end + 1 elif dtype == 7: print("Data: %s" % hex_string(self.header[pos:pos + count])) - next = pos + count + expected_ofs = pos + count elif dtype == 8: # string array for i in range(count): end = self.header.find(six.b('\0'), pos) - print("String(%d): %r" % (end - pos, self.header[pos:end])) - pos = end + 1 - next = pos - elif dtype == 9: - # unicode string array - for i in range(count): - end = self.header.find(six.b('\0'), pos) + value = self.header[pos:end] try: - print("i18n(%d): %r" % (end - pos, _decode_item(self.header[pos:end]))) + value = self.decode_bytes(value) except Exception: print('INVALID STRING') - print("i18n(%d): %r" % (end - pos, self.header[pos:end])) + print("String(%d): %r" % (end - pos, value)) pos = end + 1 - next = pos + expected_ofs = pos + elif dtype == 9: + # i18n string array + for i in range(count): + end = self.header.find(six.b('\0'), pos) + value = self.header[pos:end] + try: + value = self.decode_bytes(value) + except Exception: + print('INVALID STRING') + print("i18n(%d): %r" % (end - pos, value)) + pos = end + 1 + expected_ofs = pos else: - print("Skipping data type %x" % dtype) - next = None - if next is not None: + print("Skipping data type 0x%x" % dtype) + expected_ofs = None + if expected_ofs is not None: pos = store + self.datalen - if next < pos: + if expected_ofs < pos: print("** HOLE at end of data block") - print("Hex: %s" % hex_string(self.header[next:pos])) - print("Data: %r" % self.header[next:pos]) - elif pos > next: + print("Size: %d" % (pos - expected_ofs)) + print("Hex: %s" % hex_string(self.header[expected_ofs:pos])) + print("Data: %r" % self.header[expected_ofs:pos]) + print("Offset: 0x%x" % self.datalen) + elif pos > expected_ofs: print("** OVERFLOW in data block") + print("Overflow size: %d" % (expected_ofs - pos)) + print("Offset: 0x%x" % self.datalen) + + def decode_bytes(self, value): + if six.PY2: + return value + else: + return value.decode(errors='surrogateescape') def __getitem__(self, key): tag, dtype, offset, count = self.index[key] assert tag == key return self._getitem(dtype, offset, count) - def _getitem(self, dtype, offset, count): + def _getitem(self, dtype, offset, count, decode=None): + if decode is None: + decode = self.decode # calculate start of store il = len(self.index) store = 16 + il * 16 pos = store + offset if dtype >= 2 and dtype <= 5: - n = 1 << (dtype - 2) - # n-byte integer - data = [_ord(x) for x in self.header[pos:pos + n]] - return multibyte(data) + values = [] + for _ in range(count): + n = 1 << (dtype - 2) + # n-byte integer + data = [_ord(x) for x in self.header[pos:pos + n]] + values.append(multibyte(data)) + pos += n + return values + elif dtype == 1: + # char treated like int8 + return [_ord(c) for c in self.header[pos:pos + count]] elif dtype == 6: # string (null terminated) - end = self.header.find('\0', pos) - return self.header[pos:end] + end = self.header.find(six.b('\0'), pos) + value = self.header[pos:end] + if decode: + value = self.decode_bytes(value) + return value elif dtype == 7: # raw data return self.header[pos:pos + count] elif dtype == 8: # string array result = [] - for i in range(count): + for _ in range(count): end = self.header.find(six.b('\0'), pos) - result.append(self.header[pos:end]) + value = self.header[pos:end] + if decode: + value = self.decode_bytes(value) + result.append(value) pos = end + 1 return result elif dtype == 9: - # unicode string array + # i18n string array + # note that we do not apply localization result = [] - for i in range(count): + for _ in range(count): end = self.header.find(six.b('\0'), pos) - result.append(_decode_item(self.header[pos:end])) + value = self.header[pos:end] + if decode: + value = self.decode_bytes(value) + result.append(value) pos = end + 1 return result else: - # XXX - not all valid data types are handled - raise GenericError("Unable to read header data type: %x" % dtype) + raise GenericError("Unknown header data type: %x" % dtype) - def get(self, key, default=None): + def get(self, key, default=None, decode=None, single=False): + # With decode on, we will _mostly_ return the same value that rpmlib will. + # There are exceptions where rpmlib will automatically translate or update values, e.g. + # * fields that rpm treats as scalars + # * special tags like Headerimmutable + # * i18n string translations + # * the Fileclass extension tag that overlaps a concrete tag + # * auto converting PREINPROG/POSTINPROG/etc to string arrays for older rpms entry = self.index.get(key) if entry is None: return default else: - return self._getitem(*entry[1:]) + value = self._getitem(*entry[1:], decode=decode) + if single and isinstance(value, list): + if len(value) == 1: + return value[0] + else: + raise ValueError('single value requested for array at key %s' % key) + return value def rip_rpm_sighdr(src): diff --git a/tests/test_lib/test_header_sizes.py b/tests/test_lib/test_header_sizes.py index dc603f85..5f73c9d0 100644 --- a/tests/test_lib/test_header_sizes.py +++ b/tests/test_lib/test_header_sizes.py @@ -45,11 +45,11 @@ class TestHeaderSizes(unittest.TestCase): size = None try: tag = rpm.RPMTAG_LONGSIGSIZE - size = rh.get(tag) + size = rh.get(tag, single=True) except NameError: pass if size is None: - size = rh.get(SIGTAG_SIZE) + size = rh.get(SIGTAG_SIZE, single=True) # Expected file size calc_size = s_lead + s_sig + size diff --git a/tests/test_lib/test_rawheader_fields.py b/tests/test_lib/test_rawheader_fields.py new file mode 100644 index 00000000..18e0663c --- /dev/null +++ b/tests/test_lib/test_rawheader_fields.py @@ -0,0 +1,37 @@ +# coding=utf-8 +from __future__ import absolute_import +import os.path +import unittest + +import koji + + +class TestRawHeaderFields(unittest.TestCase): + + RPMFILES = [ + "test-deps-1-1.fc24.x86_64.rpm", + "test-files-1-1.fc27.noarch.rpm", + "test-nosrc-1-1.fc24.nosrc.rpm", + "test-deps-1-1.fc24.x86_64.rpm.signed", + "test-nopatch-1-1.fc24.nosrc.rpm", + "test-src-1-1.fc24.src.rpm", + ] + + def test_header_sizes(self): + for basename in self.RPMFILES: + fn = os.path.join(os.path.dirname(__file__), 'data/rpms', basename) + + rh = koji.RawHeader(koji.rip_rpm_hdr(fn)) + hdr = koji.get_rpm_header(fn) + + for key in rh.index: + if key in (63, 1141): + continue + ours = rh.get(key, decode=True) + theirs = hdr[key] + if type(ours) != type(theirs): + if isinstance(ours, list) and len(ours) == 1 and ours[0] == theirs: + # rpm is presenting as a scalar + continue + # otherwise + self.assertEqual(ours, theirs)