osbuild: convert to jsoncomm
Convert the hard-coded DGRAM communication to util.jsoncomm. This avoids hard-coding any IPC-details and simplifies the callers quite a bit.
This commit is contained in:
parent
6f8ba82fc6
commit
4ad4da4658
10 changed files with 79 additions and 184 deletions
|
|
@ -3,7 +3,6 @@
|
|||
import contextlib
|
||||
import json
|
||||
import os
|
||||
import socket
|
||||
import shutil
|
||||
import struct
|
||||
import subprocess
|
||||
|
|
@ -685,10 +684,5 @@ def main(tree, output_dir, options, loop_client):
|
|||
|
||||
if __name__ == '__main__':
|
||||
args = json.load(sys.stdin)
|
||||
|
||||
with socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) as sock:
|
||||
sock.setsockopt(socket.SOL_SOCKET, socket.SO_PASSCRED, 1)
|
||||
sock.connect("/run/osbuild/api/remoteloop")
|
||||
ret = main(args["tree"], args["output_dir"], args["options"], remoteloop.LoopClient(sock))
|
||||
|
||||
ret = main(args["tree"], args["output_dir"], args["options"], remoteloop.LoopClient("/run/osbuild/api/remoteloop"))
|
||||
sys.exit(ret)
|
||||
|
|
|
|||
|
|
@ -3,7 +3,6 @@
|
|||
import contextlib
|
||||
import json
|
||||
import os
|
||||
import socket
|
||||
import subprocess
|
||||
import sys
|
||||
import osbuild.remoteloop as remoteloop
|
||||
|
|
@ -97,10 +96,5 @@ def main(tree, output_dir, options, loop_client):
|
|||
|
||||
if __name__ == '__main__':
|
||||
args = json.load(sys.stdin)
|
||||
|
||||
with socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) as sock:
|
||||
sock.setsockopt(socket.SOL_SOCKET, socket.SO_PASSCRED, 1)
|
||||
sock.connect("/run/osbuild/api/remoteloop")
|
||||
r = main(args["tree"], args["output_dir"], args["options"], remoteloop.LoopClient(sock))
|
||||
|
||||
r = main(args["tree"], args["output_dir"], args["options"], remoteloop.LoopClient("/run/osbuild/api/remoteloop"))
|
||||
sys.exit(r)
|
||||
|
|
|
|||
|
|
@ -1,14 +1,10 @@
|
|||
import array
|
||||
import asyncio
|
||||
import json
|
||||
import os
|
||||
import socket
|
||||
import sys
|
||||
import tempfile
|
||||
import threading
|
||||
|
||||
|
||||
from . import remoteloop
|
||||
from .util import jsoncomm
|
||||
|
||||
|
||||
class API:
|
||||
|
|
@ -39,34 +35,32 @@ class API:
|
|||
self._output = os.fdopen(fd)
|
||||
return out
|
||||
|
||||
def _setup_stdio(self, sock, addr):
|
||||
def _setup_stdio(self, server, addr):
|
||||
with self._prepare_input() as stdin, \
|
||||
self._prepare_output() as stdout:
|
||||
msg = {}
|
||||
fds = array.array("i")
|
||||
fds = []
|
||||
fds.append(stdin.fileno())
|
||||
msg['stdin'] = 0
|
||||
fds.append(stdout.fileno())
|
||||
msg['stdout'] = 1
|
||||
fds.append(stdout.fileno())
|
||||
msg['stderr'] = 2
|
||||
remoteloop.dump_fds(sock, msg, fds, addr=addr)
|
||||
|
||||
def _dispatch(self, sock):
|
||||
msg, addr = sock.recvfrom(1024)
|
||||
args = json.loads(msg)
|
||||
if args["method"] == 'setup-stdio':
|
||||
self._setup_stdio(sock, addr)
|
||||
server.send(msg, fds=fds, destination=addr)
|
||||
|
||||
def _dispatch(self, server):
|
||||
msg, _, addr = server.recv()
|
||||
if msg["method"] == 'setup-stdio':
|
||||
self._setup_stdio(server, addr)
|
||||
|
||||
def _run_event_loop(self):
|
||||
sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
|
||||
sock.bind(self.socket_address)
|
||||
self.barrier.wait()
|
||||
self.event_loop.add_reader(sock, self._dispatch, sock)
|
||||
asyncio.set_event_loop(self.event_loop)
|
||||
self.event_loop.run_forever()
|
||||
self.event_loop.remove_reader(sock)
|
||||
sock.close()
|
||||
with jsoncomm.Socket.new_server(self.socket_address) as server:
|
||||
self.barrier.wait()
|
||||
self.event_loop.add_reader(server, self._dispatch, server)
|
||||
asyncio.set_event_loop(self.event_loop)
|
||||
self.event_loop.run_forever()
|
||||
self.event_loop.remove_reader(server)
|
||||
|
||||
def __enter__(self):
|
||||
self.thread.start()
|
||||
|
|
|
|||
|
|
@ -1,12 +1,10 @@
|
|||
import array
|
||||
import asyncio
|
||||
import contextlib
|
||||
import errno
|
||||
import json
|
||||
import os
|
||||
import socket
|
||||
import threading
|
||||
from . import loop
|
||||
from .util import jsoncomm
|
||||
|
||||
|
||||
__all__ = [
|
||||
|
|
@ -15,21 +13,6 @@ __all__ = [
|
|||
]
|
||||
|
||||
|
||||
def load_fds(sock, msglen):
|
||||
fds = array.array("i") # Array of ints
|
||||
msg, ancdata, _, addr = sock.recvmsg(msglen, socket.CMSG_LEN(253 * fds.itemsize))
|
||||
for cmsg_level, cmsg_type, cmsg_data in ancdata:
|
||||
if (cmsg_level == socket.SOL_SOCKET and cmsg_type == socket.SCM_RIGHTS):
|
||||
# Append data, ignoring any truncated integers at the end.
|
||||
fds.frombytes(cmsg_data[:len(cmsg_data) - (len(cmsg_data) % fds.itemsize)])
|
||||
return json.loads(msg), list(fds), addr
|
||||
|
||||
|
||||
def dump_fds(sock, obj, fds, flags=0, addr=None):
|
||||
ancillary = [(socket.SOL_SOCKET, socket.SCM_RIGHTS, array.array("i", fds))]
|
||||
sock.sendmsg([json.dumps(obj).encode('utf-8')], ancillary, flags, addr)
|
||||
|
||||
|
||||
class LoopServer:
|
||||
"""Server for creating loopback devices
|
||||
|
||||
|
|
@ -90,8 +73,8 @@ class LoopServer:
|
|||
self.devs.append(lo)
|
||||
return lo.devname
|
||||
|
||||
def _dispatch(self, sock):
|
||||
args, fds, addr = load_fds(sock, 1024)
|
||||
def _dispatch(self, server):
|
||||
args, fds, addr = server.recv()
|
||||
|
||||
fd = fds[args["fd"]]
|
||||
dir_fd = fds[args["dir_fd"]]
|
||||
|
|
@ -99,18 +82,16 @@ class LoopServer:
|
|||
sizelimit = args.get("sizelimit")
|
||||
|
||||
devname = self._create_device(fd, dir_fd, offset, sizelimit)
|
||||
ret = json.dumps({"devname": devname})
|
||||
sock.sendto(ret.encode('utf-8'), addr)
|
||||
server.send({"devname": devname}, destination=addr)
|
||||
fds.close()
|
||||
|
||||
def _run_event_loop(self):
|
||||
sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
|
||||
sock.bind(self.socket_address)
|
||||
self.barrier.wait()
|
||||
self.event_loop.add_reader(sock, self._dispatch, sock)
|
||||
asyncio.set_event_loop(self.event_loop)
|
||||
self.event_loop.run_forever()
|
||||
self.event_loop.remove_reader(sock)
|
||||
sock.close()
|
||||
with jsoncomm.Socket.new_server(self.socket_address) as server:
|
||||
self.barrier.wait()
|
||||
self.event_loop.add_reader(server, self._dispatch, server)
|
||||
asyncio.set_event_loop(self.event_loop)
|
||||
self.event_loop.run_forever()
|
||||
self.event_loop.remove_reader(server)
|
||||
|
||||
def __enter__(self):
|
||||
self.thread.start()
|
||||
|
|
@ -125,13 +106,19 @@ class LoopServer:
|
|||
|
||||
|
||||
class LoopClient:
|
||||
def __init__(self, sock):
|
||||
self.sock = sock
|
||||
client = None
|
||||
|
||||
def __init__(self, connect_to):
|
||||
self.client = jsoncomm.Socket.new_client(connect_to)
|
||||
|
||||
def __del__(self):
|
||||
if self.client is not None:
|
||||
self.client.close()
|
||||
|
||||
@contextlib.contextmanager
|
||||
def device(self, filename, offset=None, sizelimit=None):
|
||||
req = {}
|
||||
fds = array.array("i")
|
||||
fds = []
|
||||
|
||||
fd = os.open(filename, os.O_RDWR)
|
||||
dir_fd = os.open("/dev", os.O_DIRECTORY)
|
||||
|
|
@ -146,12 +133,12 @@ class LoopClient:
|
|||
if sizelimit:
|
||||
req["sizelimit"] = sizelimit
|
||||
|
||||
dump_fds(self.sock, req, fds)
|
||||
self.client.send(req, fds=fds)
|
||||
os.close(dir_fd)
|
||||
os.close(fd)
|
||||
|
||||
ret = json.loads(self.sock.recv(1024))
|
||||
path = os.path.join("/dev", ret["devname"])
|
||||
payload, _, _ = self.client.recv()
|
||||
path = os.path.join("/dev", payload["devname"])
|
||||
try:
|
||||
yield path
|
||||
finally:
|
||||
|
|
|
|||
|
|
@ -1,8 +1,8 @@
|
|||
import asyncio
|
||||
import json
|
||||
import socket
|
||||
import subprocess
|
||||
import threading
|
||||
from .util import jsoncomm
|
||||
|
||||
|
||||
class SourcesServer:
|
||||
|
|
@ -39,22 +39,18 @@ class SourcesServer:
|
|||
except ValueError:
|
||||
return {"error": f"source returned malformed json: {r.stdout}"}
|
||||
|
||||
def _dispatch(self, sock):
|
||||
msg, addr = sock.recvfrom(65536)
|
||||
request = json.loads(msg)
|
||||
def _dispatch(self, server):
|
||||
request, _, addr = server.recv()
|
||||
reply = self._run_source(request["source"], request["checksums"])
|
||||
msg = json.dumps(reply).encode("utf-8")
|
||||
sock.sendmsg([msg], [], 0, addr)
|
||||
server.send(reply, destination=addr)
|
||||
|
||||
def _run_event_loop(self):
|
||||
sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
|
||||
sock.bind(self.socket_address)
|
||||
self.barrier.wait()
|
||||
self.event_loop.add_reader(sock, self._dispatch, sock)
|
||||
asyncio.set_event_loop(self.event_loop)
|
||||
self.event_loop.run_forever()
|
||||
self.event_loop.remove_reader(sock)
|
||||
sock.close()
|
||||
with jsoncomm.Socket.new_server(self.socket_address) as server:
|
||||
self.barrier.wait()
|
||||
self.event_loop.add_reader(server, self._dispatch, server)
|
||||
asyncio.set_event_loop(self.event_loop)
|
||||
self.event_loop.run_forever()
|
||||
self.event_loop.remove_reader(server)
|
||||
|
||||
def __enter__(self):
|
||||
self.thread.start()
|
||||
|
|
@ -67,15 +63,13 @@ class SourcesServer:
|
|||
|
||||
|
||||
def get(source, checksums, api_path="/run/osbuild/api/sources"):
|
||||
with socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) as sock:
|
||||
sock.setsockopt(socket.SOL_SOCKET, socket.SO_PASSCRED, 1)
|
||||
sock.connect(api_path)
|
||||
with jsoncomm.Socket.new_client(api_path) as client:
|
||||
msg = {
|
||||
"source": source,
|
||||
"checksums": checksums
|
||||
}
|
||||
sock.sendall(json.dumps(msg).encode('utf-8'))
|
||||
reply = json.loads(sock.recv(65536))
|
||||
client.send(msg)
|
||||
reply, _, _ = client.recv()
|
||||
if "error" in reply:
|
||||
raise RuntimeError(f"{source}: " + reply["error"])
|
||||
return reply
|
||||
|
|
|
|||
|
|
@ -1,23 +1,11 @@
|
|||
#!/usr/bin/python3
|
||||
|
||||
import array
|
||||
import json
|
||||
import os
|
||||
import socket
|
||||
import subprocess
|
||||
import sys
|
||||
from osbuild.util import jsoncomm
|
||||
|
||||
|
||||
# copied from remoteloop.py
|
||||
def load_fds(sock, msglen):
|
||||
fds = array.array("i") # Array of ints
|
||||
msg, ancdata, _, addr = sock.recvmsg(msglen, socket.CMSG_LEN(253 * fds.itemsize))
|
||||
for cmsg_level, cmsg_type, cmsg_data in ancdata:
|
||||
if (cmsg_level == socket.SOL_SOCKET and cmsg_type == socket.SCM_RIGHTS):
|
||||
# Append data, ignoring any truncated integers at the end.
|
||||
fds.frombytes(cmsg_data[:len(cmsg_data) - (len(cmsg_data) % fds.itemsize)])
|
||||
return json.loads(msg), list(fds), addr
|
||||
|
||||
def ldconfig():
|
||||
# ld.so.conf must exist, or `ldconfig` throws a warning
|
||||
subprocess.run(["touch", "/etc/ld.so.conf"], check=True)
|
||||
|
|
@ -49,17 +37,15 @@ def nsswitch():
|
|||
|
||||
|
||||
def setup_stdio():
|
||||
with socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) as sock:
|
||||
sock.setsockopt(socket.SOL_SOCKET, socket.SO_PASSCRED, 1)
|
||||
sock.connect("/run/osbuild/api/osbuild")
|
||||
with jsoncomm.Socket.new_client("/run/osbuild/api/osbuild") as client:
|
||||
req = {'method': 'setup-stdio'}
|
||||
sock.send(json.dumps(req).encode('utf-8'))
|
||||
msg, fds, _ = load_fds(sock, 1024)
|
||||
client.send(req)
|
||||
msg, fds, _ = client.recv()
|
||||
for io in ['stdin', 'stdout', 'stderr']:
|
||||
target = getattr(sys, io)
|
||||
source = fds[msg[io]]
|
||||
os.dup2(source, target.fileno())
|
||||
os.close(source)
|
||||
fds.close()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
|
|
|||
|
|
@ -1,34 +1,21 @@
|
|||
#!/usr/bin/python3
|
||||
|
||||
import array
|
||||
import json
|
||||
import os
|
||||
import socket
|
||||
import subprocess
|
||||
import sys
|
||||
from osbuild.util import jsoncomm
|
||||
|
||||
|
||||
def load_fds(sock, msglen):
|
||||
fds = array.array("i") # Array of ints
|
||||
msg, ancdata, _, addr = sock.recvmsg(msglen, socket.CMSG_LEN(253 * fds.itemsize))
|
||||
for cmsg_level, cmsg_type, cmsg_data in ancdata:
|
||||
if (cmsg_level == socket.SOL_SOCKET and cmsg_type == socket.SCM_RIGHTS):
|
||||
# Append data, ignoring any truncated integers at the end.
|
||||
fds.frombytes(cmsg_data[:len(cmsg_data) - (len(cmsg_data) % fds.itemsize)])
|
||||
return json.loads(msg), list(fds), addr
|
||||
|
||||
def setup_stdio():
|
||||
with socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) as sock:
|
||||
sock.setsockopt(socket.SOL_SOCKET, socket.SO_PASSCRED, 1)
|
||||
sock.connect("/run/osbuild/api/osbuild")
|
||||
with jsoncomm.Socket.new_client("/run/osbuild/api/osbuild") as client:
|
||||
req = {'method': 'setup-stdio'}
|
||||
sock.send(json.dumps(req).encode('utf-8'))
|
||||
msg, fds, _ = load_fds(sock, 1024)
|
||||
client.send(req)
|
||||
msg, fds, _ = client.recv()
|
||||
for io in ['stdin', 'stdout', 'stderr']:
|
||||
target = getattr(sys, io)
|
||||
source = fds[msg[io]]
|
||||
os.dup2(source, target.fileno())
|
||||
os.close(source)
|
||||
fds.close()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
|
|
|||
|
|
@ -1,23 +1,11 @@
|
|||
#!/usr/bin/python3.6
|
||||
|
||||
import array
|
||||
import json
|
||||
import os
|
||||
import socket
|
||||
import subprocess
|
||||
import sys
|
||||
from osbuild.util import jsoncomm
|
||||
|
||||
|
||||
# copied from remoteloop.py
|
||||
def load_fds(sock, msglen):
|
||||
fds = array.array("i") # Array of ints
|
||||
msg, ancdata, _, addr = sock.recvmsg(msglen, socket.CMSG_LEN(253 * fds.itemsize))
|
||||
for cmsg_level, cmsg_type, cmsg_data in ancdata:
|
||||
if (cmsg_level == socket.SOL_SOCKET and cmsg_type == socket.SCM_RIGHTS):
|
||||
# Append data, ignoring any truncated integers at the end.
|
||||
fds.frombytes(cmsg_data[:len(cmsg_data) - (len(cmsg_data) % fds.itemsize)])
|
||||
return json.loads(msg), list(fds), addr
|
||||
|
||||
def ldconfig():
|
||||
# ld.so.conf must exist, or `ldconfig` throws a warning
|
||||
subprocess.run(["touch", "/etc/ld.so.conf"], check=True)
|
||||
|
|
@ -49,17 +37,15 @@ def nsswitch():
|
|||
|
||||
|
||||
def setup_stdio():
|
||||
with socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) as sock:
|
||||
sock.setsockopt(socket.SOL_SOCKET, socket.SO_PASSCRED, 1)
|
||||
sock.connect("/run/osbuild/api/osbuild")
|
||||
with jsoncomm.Socket.new_client("/run/osbuild/api/osbuild") as client:
|
||||
req = {'method': 'setup-stdio'}
|
||||
sock.send(json.dumps(req).encode('utf-8'))
|
||||
msg, fds, _ = load_fds(sock, 1024)
|
||||
client.send(req)
|
||||
msg, fds, _ = client.recv()
|
||||
for io in ['stdin', 'stdout', 'stderr']:
|
||||
target = getattr(sys, io)
|
||||
source = fds[msg[io]]
|
||||
os.dup2(source, target.fileno())
|
||||
os.close(source)
|
||||
fds.close()
|
||||
|
||||
def os_release():
|
||||
"""/usr/lib/os-release doesn't exist. The `redhat-release` package
|
||||
|
|
|
|||
|
|
@ -1,23 +1,11 @@
|
|||
#!/usr/bin/python3.6
|
||||
|
||||
import array
|
||||
import json
|
||||
import os
|
||||
import socket
|
||||
import subprocess
|
||||
import sys
|
||||
from osbuild.util import jsoncomm
|
||||
|
||||
|
||||
# copied from remoteloop.py
|
||||
def load_fds(sock, msglen):
|
||||
fds = array.array("i") # Array of ints
|
||||
msg, ancdata, _, addr = sock.recvmsg(msglen, socket.CMSG_LEN(253 * fds.itemsize))
|
||||
for cmsg_level, cmsg_type, cmsg_data in ancdata:
|
||||
if (cmsg_level == socket.SOL_SOCKET and cmsg_type == socket.SCM_RIGHTS):
|
||||
# Append data, ignoring any truncated integers at the end.
|
||||
fds.frombytes(cmsg_data[:len(cmsg_data) - (len(cmsg_data) % fds.itemsize)])
|
||||
return json.loads(msg), list(fds), addr
|
||||
|
||||
def ldconfig():
|
||||
# ld.so.conf must exist, or `ldconfig` throws a warning
|
||||
subprocess.run(["touch", "/etc/ld.so.conf"], check=True)
|
||||
|
|
@ -49,17 +37,16 @@ def nsswitch():
|
|||
|
||||
|
||||
def setup_stdio():
|
||||
with socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) as sock:
|
||||
sock.setsockopt(socket.SOL_SOCKET, socket.SO_PASSCRED, 1)
|
||||
sock.connect("/run/osbuild/api/osbuild")
|
||||
with jsoncomm.Socket.new_client("/run/osbuild/api/osbuild") as client:
|
||||
req = {'method': 'setup-stdio'}
|
||||
sock.send(json.dumps(req).encode('utf-8'))
|
||||
msg, fds, _ = load_fds(sock, 1024)
|
||||
client.send(req)
|
||||
msg, fds, _ = client.recv()
|
||||
for io in ['stdin', 'stdout', 'stderr']:
|
||||
target = getattr(sys, io)
|
||||
source = fds[msg[io]]
|
||||
os.dup2(source, target.fileno())
|
||||
os.close(source)
|
||||
fds.close()
|
||||
|
||||
|
||||
def python_alternatives():
|
||||
"""/usr/bin/python3 is a symlink to /etc/alternatives/python3, which points
|
||||
|
|
|
|||
|
|
@ -1,23 +1,11 @@
|
|||
#!/usr/bin/python3
|
||||
|
||||
import array
|
||||
import json
|
||||
import os
|
||||
import socket
|
||||
import subprocess
|
||||
import sys
|
||||
from osbuild.util import jsoncomm
|
||||
|
||||
|
||||
# copied from remoteloop.py
|
||||
def load_fds(sock, msglen):
|
||||
fds = array.array("i") # Array of ints
|
||||
msg, ancdata, _, addr = sock.recvmsg(msglen, socket.CMSG_LEN(253 * fds.itemsize))
|
||||
for cmsg_level, cmsg_type, cmsg_data in ancdata:
|
||||
if (cmsg_level == socket.SOL_SOCKET and cmsg_type == socket.SCM_RIGHTS):
|
||||
# Append data, ignoring any truncated integers at the end.
|
||||
fds.frombytes(cmsg_data[:len(cmsg_data) - (len(cmsg_data) % fds.itemsize)])
|
||||
return json.loads(msg), list(fds), addr
|
||||
|
||||
def ldconfig():
|
||||
# ld.so.conf must exist, or `ldconfig` throws a warning
|
||||
subprocess.run(["touch", "/etc/ld.so.conf"], check=True)
|
||||
|
|
@ -49,17 +37,15 @@ def nsswitch():
|
|||
|
||||
|
||||
def setup_stdio():
|
||||
with socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) as sock:
|
||||
sock.setsockopt(socket.SOL_SOCKET, socket.SO_PASSCRED, 1)
|
||||
sock.connect("/run/osbuild/api/osbuild")
|
||||
with jsoncomm.Socket.new_client("/run/osbuild/api/osbuild") as client:
|
||||
req = {'method': 'setup-stdio'}
|
||||
sock.send(json.dumps(req).encode('utf-8'))
|
||||
msg, fds, _ = load_fds(sock, 1024)
|
||||
client.send(req)
|
||||
msg, fds, _ = client.recv()
|
||||
for io in ['stdin', 'stdout', 'stderr']:
|
||||
target = getattr(sys, io)
|
||||
source = fds[msg[io]]
|
||||
os.dup2(source, target.fileno())
|
||||
os.close(source)
|
||||
fds.close()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue