PR#4370: adjust upload offset and overwrite logic
Merges #4370 https://pagure.io/koji/pull-request/4370 Fixes: #4371 https://pagure.io/koji/issue/4371 Surprising upload failure on koji.fp.o
This commit is contained in:
commit
c44a33ceb6
3 changed files with 357 additions and 4 deletions
|
|
@ -16277,8 +16277,10 @@ def handle_upload(environ):
|
|||
start = time.time()
|
||||
if not context.session.logged_in:
|
||||
raise koji.ActionNotAllowed('you must be logged-in to upload a file')
|
||||
|
||||
# read upload parameters
|
||||
args = parse_qs(environ.get('QUERY_STRING', ''), strict_parsing=True)
|
||||
# XXX - already parsed by auth
|
||||
# TODO - unify with earlier query parsing in auth?
|
||||
name = args['filename'][0]
|
||||
path = args.get('filepath', ('',))[0]
|
||||
verify = args.get('fileverify', ('',))[0]
|
||||
|
|
@ -16286,26 +16288,50 @@ def handle_upload(environ):
|
|||
offset = args.get('offset', ('0',))[0]
|
||||
offset = int(offset)
|
||||
volume = args.get('volume', ('DEFAULT',))[0]
|
||||
|
||||
# check upload destination
|
||||
fn = get_upload_path(path, name, create=True, volume=volume)
|
||||
if os.path.exists(fn):
|
||||
if not os.path.isfile(fn):
|
||||
try:
|
||||
st = os.lstat(fn)
|
||||
except FileNotFoundError:
|
||||
st = None
|
||||
if st:
|
||||
if stat.S_ISLNK(st.st_mode):
|
||||
# upload paths should never by symlinks
|
||||
raise koji.GenericError("destination is a symlink: %s" % fn)
|
||||
if not stat.S_ISREG(st.st_mode):
|
||||
raise koji.GenericError("destination not a file: %s" % fn)
|
||||
if offset == 0 and not overwrite:
|
||||
raise koji.GenericError("upload path exists: %s" % fn)
|
||||
|
||||
# handle the upload
|
||||
chksum = get_verify_class(verify)()
|
||||
size = 0
|
||||
inf = environ['wsgi.input']
|
||||
fd = os.open(fn, os.O_RDWR | os.O_CREAT, 0o666)
|
||||
try:
|
||||
# acquire lock
|
||||
try:
|
||||
fcntl.lockf(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
||||
except IOError as e:
|
||||
raise koji.LockError(e)
|
||||
|
||||
# handle offset
|
||||
if offset == -1:
|
||||
# append to end of file
|
||||
offset = os.lseek(fd, 0, 2)
|
||||
else:
|
||||
elif overwrite:
|
||||
# if we're overwriting an older upload (e.g. logs for restarted task)
|
||||
# then truncate the file at the write offset
|
||||
os.ftruncate(fd, offset)
|
||||
os.lseek(fd, offset, 0)
|
||||
else:
|
||||
# if we're not overwriting, then do not truncate
|
||||
# note that call may be a retry, so we may be writing over an existing chunk
|
||||
os.lseek(fd, offset, 0)
|
||||
# in this case we have an additional check after the loop
|
||||
|
||||
# i/o loop
|
||||
while True:
|
||||
try:
|
||||
chunk = inf.read(65536)
|
||||
|
|
@ -16326,9 +16352,21 @@ def handle_upload(environ):
|
|||
if verify:
|
||||
chksum.update(chunk)
|
||||
os.write(fd, chunk)
|
||||
|
||||
# offset check
|
||||
if not overwrite:
|
||||
# end of file should match where we think we are
|
||||
flen = os.lseek(fd, 0, 2)
|
||||
expected = offset + size
|
||||
if flen != expected:
|
||||
raise koji.GenericError(f"Incorrect upload length for {fn} - "
|
||||
f"Expected {expected}, actual {flen}")
|
||||
|
||||
finally:
|
||||
# this will also remove our lock
|
||||
os.close(fd)
|
||||
|
||||
# return some basic stats for client verification
|
||||
ret = {
|
||||
'size': size,
|
||||
'fileverify': verify,
|
||||
|
|
|
|||
221
tests/test_hub/test_handle_upload.py
Normal file
221
tests/test_hub/test_handle_upload.py
Normal file
|
|
@ -0,0 +1,221 @@
|
|||
import os
|
||||
import io
|
||||
from unittest import mock
|
||||
import shutil
|
||||
import tempfile
|
||||
import urllib.parse
|
||||
import unittest
|
||||
|
||||
from kojihub import kojihub
|
||||
import koji
|
||||
|
||||
|
||||
class TestHandleUpload(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.tempdir = tempfile.mkdtemp()
|
||||
self.pathinfo = koji.PathInfo(self.tempdir)
|
||||
mock.patch('koji.pathinfo', new=self.pathinfo).start()
|
||||
self.lookup_name = mock.patch('kojihub.kojihub.lookup_name').start()
|
||||
self.context = mock.patch('kojihub.kojihub.context').start()
|
||||
self.context.session.logged_in = True
|
||||
self.context.session.user_id = 1
|
||||
|
||||
def tearDown(self):
|
||||
shutil.rmtree(self.tempdir)
|
||||
mock.patch.stopall()
|
||||
|
||||
def test_simple_upload(self):
|
||||
environ = {}
|
||||
args = {
|
||||
'filename': 'hello.txt',
|
||||
'filepath': 'FOO',
|
||||
'fileverify': 'adler32',
|
||||
'offset': '0',
|
||||
}
|
||||
environ['QUERY_STRING'] = urllib.parse.urlencode(args)
|
||||
contents = b'hello world\n'
|
||||
environ['wsgi.input'] = io.BytesIO(contents)
|
||||
|
||||
# upload
|
||||
kojihub.handle_upload(environ)
|
||||
|
||||
# verify
|
||||
fn = f'{self.tempdir}/work/FOO/hello.txt'
|
||||
self.assertEqual(contents, open(fn, 'rb').read())
|
||||
|
||||
def test_no_overwrite(self):
|
||||
environ = {}
|
||||
args = {
|
||||
# overwrite should default to False
|
||||
'filename': 'hello.txt',
|
||||
'filepath': 'FOO',
|
||||
'fileverify': 'adler32',
|
||||
'offset': '0',
|
||||
}
|
||||
environ['QUERY_STRING'] = urllib.parse.urlencode(args)
|
||||
contents = b'hello world\n'
|
||||
environ['wsgi.input'] = io.BytesIO(contents)
|
||||
fn = f'{self.tempdir}/work/FOO/hello.txt'
|
||||
koji.ensuredir(os.path.dirname(fn))
|
||||
with open(fn, 'wt') as fp:
|
||||
fp.write('already exists')
|
||||
|
||||
# upload
|
||||
with self.assertRaises(koji.GenericError) as ex:
|
||||
kojihub.handle_upload(environ)
|
||||
|
||||
# verify error
|
||||
self.assertIn('upload path exists', str(ex.exception))
|
||||
|
||||
def test_no_symlink(self):
|
||||
environ = {}
|
||||
args = {
|
||||
# overwrite should default to False
|
||||
'filename': 'hello.txt',
|
||||
'filepath': 'FOO',
|
||||
'fileverify': 'adler32',
|
||||
'offset': '0',
|
||||
}
|
||||
environ['QUERY_STRING'] = urllib.parse.urlencode(args)
|
||||
fn = f'{self.tempdir}/work/FOO/hello.txt'
|
||||
koji.ensuredir(os.path.dirname(fn))
|
||||
os.symlink('link_target', fn)
|
||||
|
||||
# upload
|
||||
with self.assertRaises(koji.GenericError) as ex:
|
||||
kojihub.handle_upload(environ)
|
||||
|
||||
# verify error
|
||||
self.assertIn('destination is a symlink', str(ex.exception))
|
||||
|
||||
def test_no_nonfile(self):
|
||||
environ = {}
|
||||
args = {
|
||||
# overwrite should default to False
|
||||
'filename': 'hello.txt',
|
||||
'filepath': 'FOO',
|
||||
'fileverify': 'adler32',
|
||||
'offset': '0',
|
||||
}
|
||||
environ['QUERY_STRING'] = urllib.parse.urlencode(args)
|
||||
fn = f'{self.tempdir}/work/FOO/hello.txt'
|
||||
koji.ensuredir(os.path.dirname(fn))
|
||||
os.mkdir(fn)
|
||||
|
||||
# upload
|
||||
with self.assertRaises(koji.GenericError) as ex:
|
||||
kojihub.handle_upload(environ)
|
||||
|
||||
# verify error
|
||||
self.assertIn('destination not a file', str(ex.exception))
|
||||
|
||||
def test_login_required(self):
|
||||
environ = {}
|
||||
self.context.session.logged_in = False
|
||||
|
||||
with self.assertRaises(koji.ActionNotAllowed):
|
||||
kojihub.handle_upload(environ)
|
||||
|
||||
def test_retry(self):
|
||||
# uploading the same chunk twice should be fine
|
||||
environ = {}
|
||||
args = {
|
||||
'filename': 'hello.txt',
|
||||
'filepath': 'FOO',
|
||||
'fileverify': 'adler32',
|
||||
'offset': '0',
|
||||
}
|
||||
environ['QUERY_STRING'] = urllib.parse.urlencode(args)
|
||||
contents = b'hello world\nthis is line two'
|
||||
chunks = contents.splitlines(keepends=True)
|
||||
|
||||
# chunk 0
|
||||
environ['wsgi.input'] = io.BytesIO(chunks[0])
|
||||
kojihub.handle_upload(environ)
|
||||
|
||||
# chunk 1
|
||||
environ['wsgi.input'] = io.BytesIO(chunks[1])
|
||||
args['offset'] = str(len(chunks[0]))
|
||||
environ['QUERY_STRING'] = urllib.parse.urlencode(args)
|
||||
kojihub.handle_upload(environ)
|
||||
|
||||
# chunk 1, again
|
||||
environ['wsgi.input'] = io.BytesIO(chunks[1])
|
||||
kojihub.handle_upload(environ)
|
||||
|
||||
# verify
|
||||
fn = f'{self.tempdir}/work/FOO/hello.txt'
|
||||
self.assertEqual(contents, open(fn, 'rb').read())
|
||||
|
||||
def test_no_truncate(self):
|
||||
# uploading a chunk out of order without overwrite should:
|
||||
# 1. not truncate
|
||||
# 2. error
|
||||
environ = {}
|
||||
args = {
|
||||
'filename': 'hello.txt',
|
||||
'filepath': 'FOO',
|
||||
'fileverify': 'adler32',
|
||||
'offset': '0',
|
||||
}
|
||||
environ['QUERY_STRING'] = urllib.parse.urlencode(args)
|
||||
contents = b'hello world\nthis is line two\nthis is line three'
|
||||
chunks = contents.splitlines(keepends=True)
|
||||
|
||||
# chunk 0
|
||||
environ['wsgi.input'] = io.BytesIO(chunks[0])
|
||||
kojihub.handle_upload(environ)
|
||||
|
||||
# chunk 1
|
||||
environ['wsgi.input'] = io.BytesIO(chunks[1])
|
||||
args['offset'] = str(len(chunks[0]))
|
||||
environ['QUERY_STRING'] = urllib.parse.urlencode(args)
|
||||
kojihub.handle_upload(environ)
|
||||
|
||||
# chunk 2
|
||||
environ['wsgi.input'] = io.BytesIO(chunks[2])
|
||||
args['offset'] = str(len(chunks[0]) + len(chunks[1]))
|
||||
environ['QUERY_STRING'] = urllib.parse.urlencode(args)
|
||||
kojihub.handle_upload(environ)
|
||||
|
||||
# chunk 1, again
|
||||
environ['wsgi.input'] = io.BytesIO(chunks[1])
|
||||
args['offset'] = str(len(chunks[0]))
|
||||
environ['QUERY_STRING'] = urllib.parse.urlencode(args)
|
||||
with self.assertRaises(koji.GenericError) as ex:
|
||||
kojihub.handle_upload(environ)
|
||||
|
||||
# verify
|
||||
self.assertIn('Incorrect upload length', str(ex.exception))
|
||||
# previous upload contents should still be there
|
||||
fn = f'{self.tempdir}/work/FOO/hello.txt'
|
||||
self.assertEqual(contents, open(fn, 'rb').read())
|
||||
|
||||
def test_truncate(self):
|
||||
# uploading a chunk with overwrite SHOULD truncate:
|
||||
environ = {}
|
||||
args = {
|
||||
'filename': 'hello.txt',
|
||||
'filepath': 'FOO',
|
||||
'fileverify': 'adler32',
|
||||
'offset': '0',
|
||||
}
|
||||
environ['QUERY_STRING'] = urllib.parse.urlencode(args)
|
||||
contents1 = b'hello world\nthis is line two\nthis is line three'
|
||||
contents2 = b'hello world\n'
|
||||
|
||||
# pass1
|
||||
environ['wsgi.input'] = io.BytesIO(contents1)
|
||||
kojihub.handle_upload(environ)
|
||||
|
||||
# pass2
|
||||
args['overwrite'] = '1'
|
||||
environ['QUERY_STRING'] = urllib.parse.urlencode(args)
|
||||
environ['wsgi.input'] = io.BytesIO(contents2)
|
||||
kojihub.handle_upload(environ)
|
||||
|
||||
# verify
|
||||
fn = f'{self.tempdir}/work/FOO/hello.txt'
|
||||
self.assertEqual(contents2, open(fn, 'rb').read())
|
||||
# the end
|
||||
94
tests/test_hub/test_upload.py
Normal file
94
tests/test_hub/test_upload.py
Normal file
|
|
@ -0,0 +1,94 @@
|
|||
import io
|
||||
import shutil
|
||||
import tempfile
|
||||
import urllib.parse
|
||||
import unittest
|
||||
from unittest import mock
|
||||
|
||||
import koji
|
||||
from kojihub import kojihub
|
||||
|
||||
"""
|
||||
This test involves both client and hub code.
|
||||
Since hub code has higher requirements, we group it there.
|
||||
"""
|
||||
|
||||
|
||||
def get_callMethod(self):
|
||||
# create a function to replace ClientSession._callMethod
|
||||
# self is the session instance
|
||||
|
||||
def my_callMethod(name, args, kwargs=None, retry=True):
|
||||
# we only handle the methods that fastUpload will use
|
||||
handler, headers, request = self._prepCall(name, args, kwargs)
|
||||
self.retries = 0
|
||||
if name == 'rawUpload':
|
||||
parts = urllib.parse.urlparse(handler)
|
||||
query = parts[4]
|
||||
environ = {
|
||||
'QUERY_STRING': query,
|
||||
'wsgi.input': io.BytesIO(request),
|
||||
'CONTENT_LENGTH': len(request),
|
||||
}
|
||||
return kojihub.handle_upload(environ)
|
||||
elif name == 'checkUpload':
|
||||
exports = kojihub.RootExports()
|
||||
return exports.checkUpload(*args, **kwargs)
|
||||
# else
|
||||
raise ValueError(f'Unexected call {name}')
|
||||
|
||||
return my_callMethod
|
||||
|
||||
|
||||
class TestHandleUpload(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.tempdir = tempfile.mkdtemp()
|
||||
self.pathinfo = koji.PathInfo(self.tempdir)
|
||||
mock.patch('koji.pathinfo', new=self.pathinfo).start()
|
||||
self.lookup_name = mock.patch('kojihub.kojihub.lookup_name').start()
|
||||
self.context = mock.patch('kojihub.kojihub.context').start()
|
||||
self.context.session.logged_in = True
|
||||
self.context.session.user_id = 1
|
||||
self.session = koji.ClientSession('https://koji.example.com/NOTUSED')
|
||||
self.session._callMethod = get_callMethod(self.session)
|
||||
|
||||
def tearDown(self):
|
||||
shutil.rmtree(self.tempdir)
|
||||
mock.patch.stopall()
|
||||
|
||||
def test_upload_client(self):
|
||||
# write a test file
|
||||
contents = b'Hello World. Upload me.\n' * 100
|
||||
orig = f'{self.tempdir}/orig.txt'
|
||||
with open(orig, 'wb') as fp:
|
||||
fp.write(contents)
|
||||
sinfo = {'session-id': '123', 'session-key': '456', 'callnum': 1}
|
||||
self.session.setSession(sinfo) # marks logged in
|
||||
|
||||
self.session.fastUpload(orig, 'testpath', blocksize=137)
|
||||
|
||||
# files should be identical
|
||||
dup = f'{self.tempdir}/work/testpath/orig.txt'
|
||||
with open(dup, 'rb') as fp:
|
||||
check = fp.read()
|
||||
self.assertEqual(check, contents)
|
||||
|
||||
def test_upload_client_overwrite(self):
|
||||
# same as above, but with overwrite flag
|
||||
contents = b'Hello World. Upload me.\n' * 100
|
||||
orig = f'{self.tempdir}/orig.txt'
|
||||
with open(orig, 'wb') as fp:
|
||||
fp.write(contents)
|
||||
sinfo = {'session-id': '123', 'session-key': '456', 'callnum': 1}
|
||||
self.session.setSession(sinfo) # marks logged in
|
||||
|
||||
self.session.fastUpload(orig, 'testpath', blocksize=137, overwrite=True)
|
||||
|
||||
# files should be identical
|
||||
dup = f'{self.tempdir}/work/testpath/orig.txt'
|
||||
with open(dup, 'rb') as fp:
|
||||
check = fp.read()
|
||||
self.assertEqual(check, contents)
|
||||
|
||||
# the end
|
||||
Loading…
Add table
Add a link
Reference in a new issue