inputs: convert to host service
Create a `InputService` class with an abstract method called `map`, meant to be implemented by all inputs. An `unmap` method may be optionally overridden by inputs to cleanup resources. Instantiate a `host.ServiceManager` in the `Stage.run` section and pass the to the host side input code so it can be used to spawn the input services. Convert all existing inputs to the new service framework.
This commit is contained in:
parent
08bc9ab7d8
commit
1ed85dc790
6 changed files with 122 additions and 121 deletions
|
|
@ -14,12 +14,10 @@ like `rpm.` to avoid namespace clashes. This is enforced via
|
||||||
schema validation.
|
schema validation.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
||||||
import json
|
|
||||||
import sys
|
import sys
|
||||||
import subprocess
|
import subprocess
|
||||||
|
|
||||||
from osbuild.objectstore import StoreClient
|
from osbuild import inputs
|
||||||
|
|
||||||
|
|
||||||
SCHEMA = r"""
|
SCHEMA = r"""
|
||||||
|
|
@ -77,16 +75,12 @@ SCHEMA = r"""
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
||||||
def main():
|
class FilesInput(inputs.InputService):
|
||||||
args = json.load(sys.stdin)
|
|
||||||
refs = args["refs"]
|
|
||||||
target = args["target"]
|
|
||||||
|
|
||||||
store = StoreClient(connect_to=args["api"]["store"])
|
def map(self, store, _origin, refs, target, _options):
|
||||||
source = store.source("org.osbuild.files")
|
|
||||||
|
|
||||||
for checksum in refs:
|
source = store.source("org.osbuild.files")
|
||||||
try:
|
for checksum in refs:
|
||||||
subprocess.run(
|
subprocess.run(
|
||||||
[
|
[
|
||||||
"ln",
|
"ln",
|
||||||
|
|
@ -95,21 +89,20 @@ def main():
|
||||||
],
|
],
|
||||||
check=True,
|
check=True,
|
||||||
)
|
)
|
||||||
except subprocess.CalledProcessError as e:
|
|
||||||
json.dump({"error": e.output}, sys.stdout)
|
|
||||||
return 1
|
|
||||||
|
|
||||||
reply = {
|
reply = {
|
||||||
"path": target,
|
"path": target,
|
||||||
"data": {
|
"data": {
|
||||||
"refs": refs
|
"refs": refs
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
return reply
|
||||||
|
|
||||||
json.dump(reply, sys.stdout)
|
|
||||||
return 0
|
def main():
|
||||||
|
service = FilesInput.from_args(sys.argv[1:])
|
||||||
|
service.main()
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
r = main()
|
main()
|
||||||
sys.exit(r)
|
|
||||||
|
|
|
||||||
|
|
@ -7,31 +7,38 @@ it to the stage.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
||||||
import json
|
|
||||||
import os
|
import os
|
||||||
import sys
|
import sys
|
||||||
import uuid
|
import uuid
|
||||||
|
|
||||||
|
from osbuild import inputs
|
||||||
|
|
||||||
SCHEMA = """
|
SCHEMA = """
|
||||||
"additionalProperties": true
|
"additionalProperties": true
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
class NoopInput(inputs.InputService):
|
||||||
|
|
||||||
|
def map(self, _store, _origin, refs, target, _options):
|
||||||
|
|
||||||
|
uid = str(uuid.uuid4())
|
||||||
|
path = os.path.join(target, uid)
|
||||||
|
os.makedirs(path)
|
||||||
|
|
||||||
|
reply = {
|
||||||
|
"path": target,
|
||||||
|
"data": {
|
||||||
|
"refs": refs
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return reply
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
args = json.load(sys.stdin)
|
service = NoopInput.from_args(sys.argv[1:])
|
||||||
refs = args["refs"]
|
service.main()
|
||||||
target = args["target"]
|
|
||||||
|
|
||||||
uid = str(uuid.uuid4())
|
|
||||||
path = os.path.join(target, uid)
|
|
||||||
os.makedirs(path)
|
|
||||||
|
|
||||||
data = {"path": path, "data": {"refs": refs}}
|
|
||||||
json.dump(data, sys.stdout)
|
|
||||||
return 0
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
r = main()
|
main()
|
||||||
sys.exit(r)
|
|
||||||
|
|
|
||||||
|
|
@ -17,7 +17,7 @@ import json
|
||||||
import sys
|
import sys
|
||||||
import subprocess
|
import subprocess
|
||||||
|
|
||||||
from osbuild.objectstore import StoreClient
|
from osbuild import inputs
|
||||||
|
|
||||||
|
|
||||||
SCHEMA = """
|
SCHEMA = """
|
||||||
|
|
@ -99,31 +99,31 @@ def export(checksums, cache, output):
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
json.dump(reply, sys.stdout)
|
return reply
|
||||||
|
|
||||||
|
|
||||||
|
class OSTreeInput(inputs.InputService):
|
||||||
|
|
||||||
|
def map(self, store, origin, refs, target, _options):
|
||||||
|
|
||||||
|
if origin == "org.osbuild.pipeline":
|
||||||
|
for ref, options in refs.items():
|
||||||
|
source = store.read_tree(ref)
|
||||||
|
with open(os.path.join(source, "compose.json"), "r") as f:
|
||||||
|
compose = json.load(f)
|
||||||
|
commit_id = compose["ostree-commit"]
|
||||||
|
reply = export({commit_id: options}, source, target)
|
||||||
|
else:
|
||||||
|
source = store.source("org.osbuild.ostree")
|
||||||
|
reply = export(refs, source, target)
|
||||||
|
|
||||||
|
return reply
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
args = json.load(sys.stdin)
|
service = OSTreeInput.from_args(sys.argv[1:])
|
||||||
refs = args["refs"]
|
service.main()
|
||||||
target = args["target"]
|
|
||||||
|
|
||||||
origin = args["origin"]
|
|
||||||
store = StoreClient(connect_to=args["api"]["store"])
|
|
||||||
|
|
||||||
if origin == "org.osbuild.pipeline":
|
|
||||||
for ref, options in refs.items():
|
|
||||||
source = store.read_tree(ref)
|
|
||||||
with open(os.path.join(source, "compose.json"), "r") as f:
|
|
||||||
compose = json.load(f)
|
|
||||||
commit_id = compose["ostree-commit"]
|
|
||||||
export({commit_id: options}, source, target)
|
|
||||||
else:
|
|
||||||
source = store.source("org.osbuild.ostree")
|
|
||||||
export(refs, source, target)
|
|
||||||
|
|
||||||
return 0
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
r = main()
|
main()
|
||||||
sys.exit(r)
|
|
||||||
|
|
|
||||||
|
|
@ -8,11 +8,9 @@ in read only mode. If the id is `null` or the empty
|
||||||
string it returns an empty tree.
|
string it returns an empty tree.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
||||||
import json
|
|
||||||
import sys
|
import sys
|
||||||
|
|
||||||
from osbuild.objectstore import StoreClient
|
from osbuild import inputs
|
||||||
|
|
||||||
|
|
||||||
SCHEMA = """
|
SCHEMA = """
|
||||||
|
|
@ -52,33 +50,29 @@ SCHEMA = """
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
||||||
def error(msg):
|
class TreeInput(inputs.InputService):
|
||||||
json.dump({"error": msg}, sys.stdout)
|
|
||||||
sys.exit(1)
|
def map(self, store, _origin, refs, target, _options):
|
||||||
|
|
||||||
|
# input verification *must* have been done via schema
|
||||||
|
# verification. It is expected that origin is a pipeline
|
||||||
|
# and we have exactly one reference, i.e. a pipeline id
|
||||||
|
pid, _ = refs.popitem()
|
||||||
|
|
||||||
|
if pid:
|
||||||
|
path = store.read_tree_at(pid, target)
|
||||||
|
|
||||||
|
if not path:
|
||||||
|
raise ValueError(f"Unknown pipeline '{pid}'")
|
||||||
|
|
||||||
|
reply = {"path": target}
|
||||||
|
return reply
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
args = json.load(sys.stdin)
|
service = TreeInput.from_args(sys.argv[1:])
|
||||||
refs = args["refs"]
|
service.main()
|
||||||
target = args["target"]
|
|
||||||
|
|
||||||
# input verification *must* have been done via schema
|
|
||||||
# verification. It is expected that origin is a pipeline
|
|
||||||
# and we have exactly one reference, i.e. a pipeline id
|
|
||||||
pid, _ = refs.popitem()
|
|
||||||
|
|
||||||
store = StoreClient(connect_to=args["api"]["store"])
|
|
||||||
|
|
||||||
if pid:
|
|
||||||
path = store.read_tree_at(pid, target)
|
|
||||||
|
|
||||||
if not path:
|
|
||||||
error(f"Could not find pipeline with id '{pid}'")
|
|
||||||
|
|
||||||
json.dump({"path": target}, sys.stdout)
|
|
||||||
return 0
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
r = main()
|
main()
|
||||||
sys.exit(r)
|
|
||||||
|
|
|
||||||
|
|
@ -16,17 +16,16 @@ osbuild is the path. The input options are just passed to the
|
||||||
`Input` as is and the result is forwarded to the `Stage`.
|
`Input` as is and the result is forwarded to the `Stage`.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
import abc
|
||||||
import hashlib
|
import hashlib
|
||||||
import importlib
|
|
||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
import subprocess
|
|
||||||
|
|
||||||
from typing import Dict, Optional, Tuple
|
from typing import Dict, Optional, Tuple
|
||||||
|
|
||||||
|
from osbuild import host
|
||||||
from osbuild.util.types import PathLike
|
from osbuild.util.types import PathLike
|
||||||
from .objectstore import StoreServer
|
from .objectstore import StoreClient, StoreServer
|
||||||
|
|
||||||
|
|
||||||
class Input:
|
class Input:
|
||||||
|
|
@ -59,13 +58,15 @@ class Input:
|
||||||
m.update(json.dumps(self.options, sort_keys=True).encode())
|
m.update(json.dumps(self.options, sort_keys=True).encode())
|
||||||
return m.hexdigest()
|
return m.hexdigest()
|
||||||
|
|
||||||
def run(self, storeapi: StoreServer, root: PathLike) -> Tuple[str, Dict]:
|
def map(self,
|
||||||
name = self.info.name
|
mgr: host.ServiceManager,
|
||||||
|
storeapi: StoreServer,
|
||||||
|
root: PathLike) -> Tuple[str, Dict]:
|
||||||
|
|
||||||
target = os.path.join(root, self.name)
|
target = os.path.join(root, self.name)
|
||||||
os.makedirs(target)
|
os.makedirs(target)
|
||||||
|
|
||||||
msg = {
|
args = {
|
||||||
# mandatory bits
|
# mandatory bits
|
||||||
"origin": self.origin,
|
"origin": self.origin,
|
||||||
"refs": self.refs,
|
"refs": self.refs,
|
||||||
|
|
@ -81,32 +82,8 @@ class Input:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
# We want the `osbuild` python package that contains this
|
client = mgr.start(f"input/{self.name}", self.info.path)
|
||||||
# very module, which might be different from the system wide
|
reply = client.call("map", args)
|
||||||
# installed one, to be accessible to the Input programs so
|
|
||||||
# we detect our origin and set the `PYTHONPATH` accordingly
|
|
||||||
modorigin = importlib.util.find_spec("osbuild").origin
|
|
||||||
modpath = os.path.dirname(modorigin)
|
|
||||||
env = os.environ.copy()
|
|
||||||
env["PYTHONPATH"] = os.path.dirname(modpath)
|
|
||||||
|
|
||||||
r = subprocess.run([self.info.path],
|
|
||||||
env=env,
|
|
||||||
input=json.dumps(msg),
|
|
||||||
stdout=subprocess.PIPE,
|
|
||||||
encoding="utf-8",
|
|
||||||
check=False)
|
|
||||||
|
|
||||||
try:
|
|
||||||
reply = json.loads(r.stdout)
|
|
||||||
except ValueError:
|
|
||||||
raise RuntimeError(f"{name}: error: {r.stderr}") from None
|
|
||||||
|
|
||||||
if "error" in reply:
|
|
||||||
raise RuntimeError(f"{name}: " + reply["error"])
|
|
||||||
|
|
||||||
if r.returncode != 0:
|
|
||||||
raise RuntimeError(f"{name}: error {r.returncode}")
|
|
||||||
|
|
||||||
path = reply["path"]
|
path = reply["path"]
|
||||||
|
|
||||||
|
|
@ -116,3 +93,29 @@ class Input:
|
||||||
reply["path"] = os.path.relpath(path, root)
|
reply["path"] = os.path.relpath(path, root)
|
||||||
|
|
||||||
return reply
|
return reply
|
||||||
|
|
||||||
|
|
||||||
|
class InputService(host.Service):
|
||||||
|
"""Input host service"""
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def map(self, store, origin, refs, target, options):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def unmap(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
self.unmap()
|
||||||
|
|
||||||
|
def dispatch(self, method: str, args, _fds):
|
||||||
|
if method == "map":
|
||||||
|
store = StoreClient(connect_to=args["api"]["store"])
|
||||||
|
r = self.map(store,
|
||||||
|
args["origin"],
|
||||||
|
args["refs"],
|
||||||
|
args["target"],
|
||||||
|
args["options"])
|
||||||
|
return r, None
|
||||||
|
|
||||||
|
raise host.ProtocolError("Unknown method")
|
||||||
|
|
|
||||||
|
|
@ -7,6 +7,7 @@ from typing import Dict, Iterator, List, Optional
|
||||||
|
|
||||||
from .api import API
|
from .api import API
|
||||||
from . import buildroot
|
from . import buildroot
|
||||||
|
from . import host
|
||||||
from . import objectstore
|
from . import objectstore
|
||||||
from . import remoteloop
|
from . import remoteloop
|
||||||
from .inputs import Input
|
from .inputs import Input
|
||||||
|
|
@ -95,8 +96,11 @@ class Stage:
|
||||||
storeapi = objectstore.StoreServer(store)
|
storeapi = objectstore.StoreServer(store)
|
||||||
cm.enter_context(storeapi)
|
cm.enter_context(storeapi)
|
||||||
|
|
||||||
|
mgr = host.ServiceManager(monitor=monitor)
|
||||||
|
cm.enter_context(mgr)
|
||||||
|
|
||||||
for key, ip in self.inputs.items():
|
for key, ip in self.inputs.items():
|
||||||
data = ip.run(storeapi, inputs_tmpdir)
|
data = ip.map(mgr, storeapi, inputs_tmpdir)
|
||||||
inputs[key] = data
|
inputs[key] = data
|
||||||
|
|
||||||
api = API(args, monitor)
|
api = API(args, monitor)
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue