PR#3703: RawHeader improvements

Merges #3703
https://pagure.io/koji/pull-request/3703

Fixes: #3713
https://pagure.io/koji/issue/3713
RawHeader - get list of strings items
This commit is contained in:
Tomas Kopecek 2023-03-23 13:20:14 +01:00
commit c0d537c4f0
3 changed files with 190 additions and 58 deletions

View file

@ -669,13 +669,12 @@ class RawHeader(object):
# see Maximum RPM Appendix A: Format of the RPM File
def __init__(self, data):
if rpm is None:
raise GenericError("rpm's python bindings are not installed")
def __init__(self, data, decode=False):
if data[0:3] != RPM_HEADER_MAGIC:
raise GenericError("Invalid rpm header: bad magic: %r" % (data[0:3],))
self.header = data
self._index()
self.decode = decode
def version(self):
# fourth byte is the version
@ -703,7 +702,7 @@ class RawHeader(object):
self.datalen = dl
self.index = index
def dump(self):
def dump(self, sig=None):
print("HEADER DUMP:")
# calculate start of store
il = len(self.index)
@ -714,35 +713,84 @@ class RawHeader(object):
# sort entries by offset, dtype
# also rearrange: tag, dtype, offset, count -> offset, dtype, tag, count
order = sorted([(x[2], x[1], x[0], x[3]) for x in six.itervalues(self.index)])
next = store
# map some rpmtag codes
tags = {}
for name, code in six.iteritems(rpm.__dict__):
if name.startswith('RPMTAG_') and isinstance(code, int):
tags[code] = name[7:].lower()
if rpm:
for name, code in six.iteritems(rpm.__dict__):
if name.startswith('RPMTAG_') and isinstance(code, int):
tags[code] = name[7:].lower()
else:
print("rpm's python bindings are not installed. Unable to convert tag codes")
if sig is None:
# detect whether this is a signature header
sig = bool(self.get(RPM_TAG_HEADERSIGNATURES))
if sig:
print("Parsing as a signature header")
# signature headers have a few different values
# the SIGTAG_* values are not exposed in the python api
# see rpmtag.h
tags[1000] = 'size'
tags[1001] = 'lemd5_1'
tags[1002] = 'pgp'
tags[1003] = 'lemd5_2'
tags[1004] = 'md5'
tags[1005] = 'gpg'
tags[1006] = 'pgp5'
tags[1007] = 'payloadsize'
tags[1008] = 'reservedspace'
# expect first entry at start
expected_ofs = store
for entry in order:
# tag, dtype, offset, count = entry
offset, dtype, tag, count = entry
pos = store + offset
if next is not None:
if pos > next:
if expected_ofs is not None:
# expected_ofs will be None after an unrecognized data type
# integer types are byte aligned for their size
align = None
pad = 0
if dtype == 3: # INT16
align = 2
elif dtype == 4: # INT32
align = 4
elif dtype == 5: # INT64
align = 8
if align:
pad = (align - (expected_ofs % align)) % align
expected_ofs += pad
if pos > expected_ofs:
print("** HOLE between entries")
print("Hex: %s" % hex_string(self.header[next:pos]))
print("Data: %r" % self.header[next:pos])
elif pos < next:
print("Size: %d" % (pos - expected_ofs))
print("Hex: %s" % hex_string(self.header[expected_ofs:pos]))
print("Data: %r" % self.header[expected_ofs:pos])
print("Padding: %i" % pad)
print("Expected offset: 0x%x" % (expected_ofs - store))
elif pad and pos == expected_ofs - pad:
print("** Missing expected padding")
print("Padding: %i" % pad)
print("Expected offset: 0x%x" % (expected_ofs - store))
elif pos < expected_ofs:
print("** OVERLAPPING entries")
print("Tag: %d [%s], Type: %d, Offset: %x, Count: %d"
print("Overlap size: %d" % (expected_ofs - pos))
print("Expected offset: 0x%x" % (expected_ofs - store))
elif pad:
# pos == expected_ofs
print("Alignment padding: %i" % pad)
padbytes = self.header[pos - pad:pos]
if padbytes != b'\0' * pad:
print("NON-NULL padding bytes: %s" % hex_string(padbytes))
print("Tag: %d [%s], Type: %d, Offset: 0x%x, Count: %d"
% (tag, tags.get(tag, '?'), dtype, offset, count))
if dtype == 0:
# null
print("[NULL entry]")
next = pos
expected_ofs = pos
elif dtype == 1:
# char
for i in range(count):
print("Char: %r" % self.header[pos])
pos += 1
next = pos
expected_ofs = pos
elif dtype >= 2 and dtype <= 5:
# integer
n = 1 << (dtype - 2)
@ -752,98 +800,145 @@ class RawHeader(object):
num = multibyte(data)
print("Int(%d): %d" % (n, num))
pos += n
next = pos
expected_ofs = pos
elif dtype == 6:
# string (null terminated)
end = self.header.find(six.b('\0'), pos)
value = self.header[pos:end]
try:
print("String(%d): %r" % (end - pos, _decode_item(self.header[pos:end])))
except ValueError:
value = self.decode_bytes(value)
except Exception:
print('INVALID STRING')
print("String(%d): %r" % (end - pos, self.header[pos:end]))
raise
next = end + 1
print("String(%d): %r" % (end - pos, value))
expected_ofs = end + 1
elif dtype == 7:
print("Data: %s" % hex_string(self.header[pos:pos + count]))
next = pos + count
expected_ofs = pos + count
elif dtype == 8:
# string array
for i in range(count):
end = self.header.find(six.b('\0'), pos)
print("String(%d): %r" % (end - pos, self.header[pos:end]))
pos = end + 1
next = pos
elif dtype == 9:
# unicode string array
for i in range(count):
end = self.header.find(six.b('\0'), pos)
value = self.header[pos:end]
try:
print("i18n(%d): %r" % (end - pos, _decode_item(self.header[pos:end])))
value = self.decode_bytes(value)
except Exception:
print('INVALID STRING')
print("i18n(%d): %r" % (end - pos, self.header[pos:end]))
print("String(%d): %r" % (end - pos, value))
pos = end + 1
next = pos
expected_ofs = pos
elif dtype == 9:
# i18n string array
for i in range(count):
end = self.header.find(six.b('\0'), pos)
value = self.header[pos:end]
try:
value = self.decode_bytes(value)
except Exception:
print('INVALID STRING')
print("i18n(%d): %r" % (end - pos, value))
pos = end + 1
expected_ofs = pos
else:
print("Skipping data type %x" % dtype)
next = None
if next is not None:
print("Skipping data type 0x%x" % dtype)
expected_ofs = None
if expected_ofs is not None:
pos = store + self.datalen
if next < pos:
if expected_ofs < pos:
print("** HOLE at end of data block")
print("Hex: %s" % hex_string(self.header[next:pos]))
print("Data: %r" % self.header[next:pos])
elif pos > next:
print("Size: %d" % (pos - expected_ofs))
print("Hex: %s" % hex_string(self.header[expected_ofs:pos]))
print("Data: %r" % self.header[expected_ofs:pos])
print("Offset: 0x%x" % self.datalen)
elif pos > expected_ofs:
print("** OVERFLOW in data block")
print("Overflow size: %d" % (expected_ofs - pos))
print("Offset: 0x%x" % self.datalen)
def decode_bytes(self, value):
if six.PY2:
return value
else:
return value.decode(errors='surrogateescape')
def __getitem__(self, key):
tag, dtype, offset, count = self.index[key]
assert tag == key
return self._getitem(dtype, offset, count)
def _getitem(self, dtype, offset, count):
def _getitem(self, dtype, offset, count, decode=None):
if decode is None:
decode = self.decode
# calculate start of store
il = len(self.index)
store = 16 + il * 16
pos = store + offset
if dtype >= 2 and dtype <= 5:
n = 1 << (dtype - 2)
# n-byte integer
data = [_ord(x) for x in self.header[pos:pos + n]]
return multibyte(data)
values = []
for _ in range(count):
n = 1 << (dtype - 2)
# n-byte integer
data = [_ord(x) for x in self.header[pos:pos + n]]
values.append(multibyte(data))
pos += n
return values
elif dtype == 1:
# char treated like int8
return [_ord(c) for c in self.header[pos:pos + count]]
elif dtype == 6:
# string (null terminated)
end = self.header.find('\0', pos)
return self.header[pos:end]
end = self.header.find(six.b('\0'), pos)
value = self.header[pos:end]
if decode:
value = self.decode_bytes(value)
return value
elif dtype == 7:
# raw data
return self.header[pos:pos + count]
elif dtype == 8:
# string array
result = []
for i in range(count):
for _ in range(count):
end = self.header.find(six.b('\0'), pos)
result.append(self.header[pos:end])
value = self.header[pos:end]
if decode:
value = self.decode_bytes(value)
result.append(value)
pos = end + 1
return result
elif dtype == 9:
# unicode string array
# i18n string array
# note that we do not apply localization
result = []
for i in range(count):
for _ in range(count):
end = self.header.find(six.b('\0'), pos)
result.append(_decode_item(self.header[pos:end]))
value = self.header[pos:end]
if decode:
value = self.decode_bytes(value)
result.append(value)
pos = end + 1
return result
else:
# XXX - not all valid data types are handled
raise GenericError("Unable to read header data type: %x" % dtype)
raise GenericError("Unknown header data type: %x" % dtype)
def get(self, key, default=None):
def get(self, key, default=None, decode=None, single=False):
# With decode on, we will _mostly_ return the same value that rpmlib will.
# There are exceptions where rpmlib will automatically translate or update values, e.g.
# * fields that rpm treats as scalars
# * special tags like Headerimmutable
# * i18n string translations
# * the Fileclass extension tag that overlaps a concrete tag
# * auto converting PREINPROG/POSTINPROG/etc to string arrays for older rpms
entry = self.index.get(key)
if entry is None:
return default
else:
return self._getitem(*entry[1:])
value = self._getitem(*entry[1:], decode=decode)
if single and isinstance(value, list):
if len(value) == 1:
return value[0]
else:
raise ValueError('single value requested for array at key %s' % key)
return value
def rip_rpm_sighdr(src):

View file

@ -45,11 +45,11 @@ class TestHeaderSizes(unittest.TestCase):
size = None
try:
tag = rpm.RPMTAG_LONGSIGSIZE
size = rh.get(tag)
size = rh.get(tag, single=True)
except NameError:
pass
if size is None:
size = rh.get(SIGTAG_SIZE)
size = rh.get(SIGTAG_SIZE, single=True)
# Expected file size
calc_size = s_lead + s_sig + size

View file

@ -0,0 +1,37 @@
# coding=utf-8
from __future__ import absolute_import
import os.path
import unittest
import koji
class TestRawHeaderFields(unittest.TestCase):
RPMFILES = [
"test-deps-1-1.fc24.x86_64.rpm",
"test-files-1-1.fc27.noarch.rpm",
"test-nosrc-1-1.fc24.nosrc.rpm",
"test-deps-1-1.fc24.x86_64.rpm.signed",
"test-nopatch-1-1.fc24.nosrc.rpm",
"test-src-1-1.fc24.src.rpm",
]
def test_header_sizes(self):
for basename in self.RPMFILES:
fn = os.path.join(os.path.dirname(__file__), 'data/rpms', basename)
rh = koji.RawHeader(koji.rip_rpm_hdr(fn))
hdr = koji.get_rpm_header(fn)
for key in rh.index:
if key in (63, 1141):
continue
ours = rh.get(key, decode=True)
theirs = hdr[key]
if type(ours) != type(theirs):
if isinstance(ours, list) and len(ours) == 1 and ours[0] == theirs:
# rpm is presenting as a scalar
continue
# otherwise
self.assertEqual(ours, theirs)