test/assembler: use new loop_for_fd helper
Use the new `loop_for_fd` helper that got added to `LoopControl`, which is basically a version of the existing code.
This commit is contained in:
parent
6293da5874
commit
2c64c65608
1 changed files with 7 additions and 18 deletions
|
|
@ -3,7 +3,6 @@
|
|||
#
|
||||
|
||||
import contextlib
|
||||
import errno
|
||||
import hashlib
|
||||
import json
|
||||
import os
|
||||
|
|
@ -248,26 +247,16 @@ class TestAssemblers(test.TestBase):
|
|||
|
||||
@contextlib.contextmanager
|
||||
def loop_create_device(ctl, fd, offset=None, sizelimit=None):
|
||||
while True:
|
||||
lo = loop.Loop(ctl.get_unbound())
|
||||
try:
|
||||
lo.set_fd(fd)
|
||||
except OSError as e:
|
||||
lo.close()
|
||||
if e.errno == errno.EBUSY:
|
||||
continue
|
||||
raise e
|
||||
try:
|
||||
lo.set_status(offset=offset, sizelimit=sizelimit, autoclear=True)
|
||||
except BlockingIOError:
|
||||
lo.clear_fd()
|
||||
lo.close()
|
||||
continue
|
||||
break
|
||||
lo = None
|
||||
try:
|
||||
lo = ctl.loop_for_fd(fd,
|
||||
offset=offset,
|
||||
sizelimit=sizelimit,
|
||||
autoclear=True)
|
||||
yield lo
|
||||
finally:
|
||||
lo.close()
|
||||
if lo:
|
||||
lo.close()
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue