test: '{. -> ./mod}/test_util_jsoncomm.py'
Move the jsoncomm-test to the other module-level tests and align its implementation with the others.
This commit is contained in:
parent
2e039a778c
commit
1ec3e5a776
1 changed files with 1 additions and 5 deletions
155
test/mod/test_util_jsoncomm.py
Normal file
155
test/mod/test_util_jsoncomm.py
Normal file
|
|
@ -0,0 +1,155 @@
|
|||
#
|
||||
# Tests for the 'osbuild.util.jsoncomm' module.
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
import tempfile
|
||||
import unittest
|
||||
|
||||
from osbuild.util import jsoncomm
|
||||
|
||||
|
||||
class TestUtilJsonComm(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.dir = tempfile.TemporaryDirectory()
|
||||
self.address = os.path.join(self.dir.name, "listener")
|
||||
self.server = jsoncomm.Socket.new_server(self.address)
|
||||
self.client = jsoncomm.Socket.new_client(self.address)
|
||||
|
||||
def tearDown(self):
|
||||
self.client.close()
|
||||
self.server.close()
|
||||
self.dir.cleanup()
|
||||
|
||||
def test_fdset(self):
|
||||
#
|
||||
# Test the FdSet implementation. Create a simple FD array and verify
|
||||
# that the FdSet correctly indexes them. Furthermore, verify that a
|
||||
# close actually closes the Fds so a following FdSet will get the same
|
||||
# FD numbers assigned.
|
||||
#
|
||||
|
||||
v1 = [os.dup(0), os.dup(0), os.dup(0), os.dup(0)]
|
||||
s = jsoncomm.FdSet.from_list(v1)
|
||||
assert len(s) == 4
|
||||
for i in range(4):
|
||||
assert s[i] == v1[i]
|
||||
with self.assertRaises(IndexError):
|
||||
_ = s[128]
|
||||
s.close()
|
||||
|
||||
v2 = [os.dup(0), os.dup(0), os.dup(0), os.dup(0)]
|
||||
assert v1 == v2
|
||||
s = jsoncomm.FdSet.from_list(v2)
|
||||
s.close()
|
||||
|
||||
def test_fdset_init(self):
|
||||
#
|
||||
# Test FdSet initializations. This includes common edge-cases like empty
|
||||
# initializers, invalid array values, or invalid types.
|
||||
#
|
||||
|
||||
s = jsoncomm.FdSet.from_list([])
|
||||
s.close()
|
||||
|
||||
with self.assertRaises(ValueError):
|
||||
v1 = [-1]
|
||||
s = jsoncomm.FdSet.from_list(v1)
|
||||
|
||||
with self.assertRaises(ValueError):
|
||||
v1 = ["foobar"]
|
||||
s = jsoncomm.FdSet(rawfds=v1)
|
||||
|
||||
def test_ping_pong(self):
|
||||
#
|
||||
# Test sending messages through the client/server connection.
|
||||
#
|
||||
|
||||
data = {"key": "value"}
|
||||
self.client.send(data)
|
||||
msg = self.server.recv()
|
||||
assert msg[0] == data
|
||||
assert len(msg[1]) == 0
|
||||
|
||||
self.server.send(data, destination=msg[2])
|
||||
msg = self.client.recv()
|
||||
assert msg[0] == data
|
||||
assert len(msg[1]) == 0
|
||||
|
||||
def test_scm_rights(self):
|
||||
#
|
||||
# Test FD transmission. Create a file, send a file-descriptor through
|
||||
# the communication channel, and then verify that the file-contents
|
||||
# can be read.
|
||||
#
|
||||
|
||||
with tempfile.TemporaryFile() as f1:
|
||||
f1.write(b"foobar")
|
||||
f1.seek(0)
|
||||
|
||||
self.client.send({}, fds=[f1.fileno()])
|
||||
|
||||
msg = self.server.recv()
|
||||
assert msg[0] == {}
|
||||
assert len(msg[1]) == 1
|
||||
with os.fdopen(msg[1].steal(0)) as f2:
|
||||
assert f2.read() == "foobar"
|
||||
|
||||
def test_listener_cleanup(self):
|
||||
#
|
||||
# Verify that only a single server can listen on a specified address.
|
||||
# Then make sure closing a server will correctly unlink its socket.
|
||||
#
|
||||
|
||||
addr = os.path.join(self.dir.name, "foobar")
|
||||
srv1 = jsoncomm.Socket.new_server(addr)
|
||||
with self.assertRaises(OSError):
|
||||
srv2 = jsoncomm.Socket.new_server(addr)
|
||||
srv1.close()
|
||||
srv2 = jsoncomm.Socket.new_server(addr)
|
||||
srv2.close()
|
||||
|
||||
def test_contextlib(self):
|
||||
#
|
||||
# Verify the context-manager of sockets. Make sure they correctly close
|
||||
# the socket, and they correctly propagate exceptions.
|
||||
#
|
||||
|
||||
assert self.client.fileno() >= 0
|
||||
with self.client as client:
|
||||
assert client == self.client
|
||||
assert client.fileno() >= 0
|
||||
with self.assertRaises(AssertionError):
|
||||
self.client.fileno()
|
||||
|
||||
assert self.server.fileno() >= 0
|
||||
with self.assertRaises(SystemError):
|
||||
with self.server as server:
|
||||
assert server.fileno() >= 0
|
||||
raise SystemError
|
||||
raise AssertionError
|
||||
with self.assertRaises(AssertionError):
|
||||
self.server.fileno()
|
||||
|
||||
def test_asyncio(self):
|
||||
#
|
||||
# Test integration with asyncio-eventloops. Use a trivial echo server
|
||||
# and test a simple ping/pong roundtrip.
|
||||
#
|
||||
|
||||
loop = asyncio.new_event_loop()
|
||||
|
||||
def echo(socket):
|
||||
msg = socket.recv()
|
||||
socket.send(msg[0], destination=msg[2])
|
||||
loop.stop()
|
||||
|
||||
self.client.send({})
|
||||
|
||||
loop.add_reader(self.server, echo, self.server)
|
||||
loop.run_forever()
|
||||
loop.close()
|
||||
|
||||
msg = self.client.recv()
|
||||
assert msg[0] == {}
|
||||
Loading…
Add table
Add a link
Reference in a new issue