Add a new module that implements a simple JSON communication channel. This is meant to replace all our hard-coded SOCK_DGRAM code that is copied all over the place. This is intentionally left simple. It only supports synchronous operations, trivial JSON encoding and decoding, and uses a message-based transport mode.
159 lines
4.5 KiB
Python
159 lines
4.5 KiB
Python
#
|
|
# Tests for the 'osbuild.util.jsoncomm' module.
|
|
#
|
|
|
|
|
|
import asyncio
|
|
import os
|
|
import tempfile
|
|
import unittest
|
|
from osbuild.util import jsoncomm
|
|
|
|
|
|
class TestUtilJsonComm(unittest.TestCase):
|
|
def setUp(self):
|
|
self.dir = tempfile.TemporaryDirectory()
|
|
self.address = os.path.join(self.dir.name, "listener")
|
|
self.server = jsoncomm.Socket.new_server(self.address)
|
|
self.client = jsoncomm.Socket.new_client(self.address)
|
|
|
|
def tearDown(self):
|
|
self.client.close()
|
|
self.server.close()
|
|
self.dir.cleanup()
|
|
|
|
def test_fdset(self):
|
|
#
|
|
# Test the FdSet implementation. Create a simple FD array and verify
|
|
# that the FdSet correctly indexes them. Furthermore, verify that a
|
|
# close actually closes the Fds so a following FdSet will get the same
|
|
# FD numbers assigned.
|
|
#
|
|
|
|
v1 = [os.dup(0), os.dup(0), os.dup(0), os.dup(0)]
|
|
s = jsoncomm.FdSet.from_list(v1)
|
|
assert len(s) == 4
|
|
for i in range(4):
|
|
assert s[i] == v1[i]
|
|
with self.assertRaises(IndexError):
|
|
_ = s[128]
|
|
s.close()
|
|
|
|
v2 = [os.dup(0), os.dup(0), os.dup(0), os.dup(0)]
|
|
assert v1 == v2
|
|
s = jsoncomm.FdSet.from_list(v2)
|
|
s.close()
|
|
|
|
def test_fdset_init(self):
|
|
#
|
|
# Test FdSet initializations. This includes common edge-cases like empty
|
|
# initializers, invalid array values, or invalid types.
|
|
#
|
|
|
|
s = jsoncomm.FdSet.from_list([])
|
|
s.close()
|
|
|
|
with self.assertRaises(ValueError):
|
|
v1 = [-1]
|
|
s = jsoncomm.FdSet.from_list(v1)
|
|
|
|
with self.assertRaises(ValueError):
|
|
v1 = ["foobar"]
|
|
s = jsoncomm.FdSet(rawfds=v1)
|
|
|
|
def test_ping_pong(self):
|
|
#
|
|
# Test sending messages through the client/server connection.
|
|
#
|
|
|
|
data = {"key": "value"}
|
|
self.client.send(data)
|
|
msg = self.server.recv()
|
|
assert msg[0] == data
|
|
assert len(msg[1]) == 0
|
|
|
|
self.server.send(data, destination=msg[2])
|
|
msg = self.client.recv()
|
|
assert msg[0] == data
|
|
assert len(msg[1]) == 0
|
|
|
|
def test_scm_rights(self):
|
|
#
|
|
# Test FD transmission. Create a file, send a file-descriptor through
|
|
# the communication channel, and then verify that the file-contents
|
|
# can be read.
|
|
#
|
|
|
|
with tempfile.TemporaryFile() as f1:
|
|
f1.write(b"foobar")
|
|
f1.seek(0)
|
|
|
|
self.client.send({}, fds=[f1.fileno()])
|
|
|
|
msg = self.server.recv()
|
|
assert msg[0] == {}
|
|
assert len(msg[1]) == 1
|
|
with os.fdopen(msg[1].steal(0)) as f2:
|
|
assert f2.read() == "foobar"
|
|
|
|
def test_listener_cleanup(self):
|
|
#
|
|
# Verify that only a single server can listen on a specified address.
|
|
# Then make sure closing a server will correctly unlink its socket.
|
|
#
|
|
|
|
addr = os.path.join(self.dir.name, "foobar")
|
|
srv1 = jsoncomm.Socket.new_server(addr)
|
|
with self.assertRaises(OSError):
|
|
srv2 = jsoncomm.Socket.new_server(addr)
|
|
srv1.close()
|
|
srv2 = jsoncomm.Socket.new_server(addr)
|
|
srv2.close()
|
|
|
|
def test_contextlib(self):
|
|
#
|
|
# Verify the context-manager of sockets. Make sure they correctly close
|
|
# the socket, and they correctly propagate exceptions.
|
|
#
|
|
|
|
assert self.client.fileno() >= 0
|
|
with self.client as client:
|
|
assert client == self.client
|
|
assert client.fileno() >= 0
|
|
with self.assertRaises(AssertionError):
|
|
self.client.fileno()
|
|
|
|
assert self.server.fileno() >= 0
|
|
with self.assertRaises(SystemError):
|
|
with self.server as server:
|
|
assert server.fileno() >= 0
|
|
raise SystemError
|
|
raise AssertionError
|
|
with self.assertRaises(AssertionError):
|
|
self.server.fileno()
|
|
|
|
def test_asyncio(self):
|
|
#
|
|
# Test integration with asyncio-eventloops. Use a trivial echo server
|
|
# and test a simple ping/pong roundtrip.
|
|
#
|
|
|
|
loop = asyncio.new_event_loop()
|
|
|
|
def echo(socket):
|
|
msg = socket.recv()
|
|
socket.send(msg[0], destination=msg[2])
|
|
loop.stop()
|
|
|
|
self.client.send({})
|
|
|
|
loop.add_reader(self.server, echo, self.server)
|
|
loop.run_forever()
|
|
loop.close()
|
|
|
|
msg = self.client.recv()
|
|
assert msg[0] == {}
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|