api: add high level message dispatcher

Availability of new incoming data is indicated to clients, i.e.
deriving classes, by invoking the `_dispatch` method, with the
`jsoncomm.Socket` as argument. All clients then need to call
`Socket.recv` to actually receive the data.
Provide a new high-level message dispatcher class by providing
a standard implementation of `_dispatch` in `BaseAPI` that calls
`socket.revc` and then invokes the new high level `_message`
method, with the data (`msg`), file descriptors (`fds`, if passed)
the socket (`sock`) and the peer address `addr`.
This commit is contained in:
Christian Kellner 2020-07-24 19:58:28 +02:00 committed by Tom Gundersen
parent aebff47908
commit aa07c5ec82

View file

@ -6,7 +6,7 @@ import os
import sys
import tempfile
import threading
from typing import Optional
from typing import Dict, Optional
from .util.types import PathLike
from .util import jsoncomm
@ -26,11 +26,16 @@ class BaseAPI(abc.ABC):
established on entering the context and will be shut down
when the context is left.
The `_dispatch` method needs to be implemented by child
classes, and is called for incoming messages.
New messages are delivered via the `_message` method, that
is meant to be implemented by deriving classes.
Optionally, the `_cleanup` method can be implemented, to
clean up resources after the context is left and the
communication channel shut down.
On incoming messages, first the `_dispatch` method will be
called; the default implementation will receive the message
call `_message.`
"""
def __init__(self, socket_address: Optional[PathLike] = None):
self.socket_address = socket_address
@ -45,9 +50,8 @@ class BaseAPI(abc.ABC):
def endpoint(cls):
"""The name of the API endpoint"""
@abc.abstractmethod
def _dispatch(self, server):
"""Called for incoming messages on the socket"""
def _message(self, msg: Dict, fds: jsoncomm.FdSet, sock: jsoncomm.Socket, addr: str):
"""Called for a new incoming message"""
def _cleanup(self):
"""Called after the event loop is shut down"""
@ -57,6 +61,11 @@ class BaseAPI(abc.ABC):
"""Called to create the temporary socket dir"""
return tempfile.TemporaryDirectory(prefix="api-", dir="/run/osbuild")
def _dispatch(self, sock: jsoncomm.Socket):
"""Called when data is available on the socket"""
msg, fds, addr = sock.recv()
self._message(msg, fds, sock, addr)
def _run_event_loop(self):
with jsoncomm.Socket.new_server(self.socket_address) as server:
self.barrier.wait()