From ca126e97471d6a5169327accb83b88c7353d72b7 Mon Sep 17 00:00:00 2001 From: Thomas Lavocat Date: Tue, 23 Nov 2021 13:55:31 +0100 Subject: [PATCH] dnf-json: Change dnf-json to be a daemon The service is started via systemd activation sockets. The service serves http POST requests, the same json as before is expected as the body of the request, and the same json as before is sent as the response of the request. --- cmd/osbuild-composer/composer.go | 2 +- cmd/osbuild-dnf-json-tests/main_test.go | 13 +- cmd/osbuild-pipeline/main.go | 2 +- cmd/osbuild-store-dump/main.go | 2 +- cmd/osbuild-worker/jobimpl-depsolve.go | 2 +- distribution/osbuild-composer.service | 1 + distribution/osbuild-dnf-json.service | 13 + distribution/osbuild-dnf-json.socket | 9 + distribution/osbuild-remote-worker@.service | 1 + distribution/osbuild-worker@.service | 2 +- dnf-json | 252 +++++++++++------- .../distro_test_common/distro_test_common.go | 2 +- internal/rpmmd/repository.go | 55 ++-- osbuild-composer.spec | 2 + 14 files changed, 217 insertions(+), 141 deletions(-) create mode 100644 distribution/osbuild-dnf-json.service create mode 100644 distribution/osbuild-dnf-json.socket diff --git a/cmd/osbuild-composer/composer.go b/cmd/osbuild-composer/composer.go index 051c51383..2ea508a1a 100644 --- a/cmd/osbuild-composer/composer.go +++ b/cmd/osbuild-composer/composer.go @@ -66,7 +66,7 @@ func NewComposer(config *ComposerConfigFile, stateDir, cacheDir string) (*Compos c.distros = distroregistry.NewDefault() logrus.Infof("Loaded %d distros", len(c.distros.List())) - c.rpm = rpmmd.NewRPMMD(path.Join(c.cacheDir, "rpmmd"), "/usr/libexec/osbuild-composer/dnf-json") + c.rpm = rpmmd.NewRPMMD(path.Join(c.cacheDir, "rpmmd")) var jobs jobqueue.JobQueue if config.Worker.PGDatabase != "" { diff --git a/cmd/osbuild-dnf-json-tests/main_test.go b/cmd/osbuild-dnf-json-tests/main_test.go index 1b74dcb0d..003c8d6f0 100644 --- a/cmd/osbuild-dnf-json-tests/main_test.go +++ b/cmd/osbuild-dnf-json-tests/main_test.go @@ -5,8 +5,8 @@ package main import ( - "fmt" "io/ioutil" + "net/http" "os" "path" "testing" @@ -23,6 +23,11 @@ import ( func TestFetchChecksum(t *testing.T) { dir, err := test.SetUpTemporaryRepository() + fs := http.FileServer(http.Dir(dir)) + go func() { + err := http.ListenAndServe(":9000", fs) + assert.Nilf(t, err, "Could not start the http server: %v", err) + }() defer func(dir string) { err := test.TearDownTemporaryRepository(dir) assert.Nil(t, err, "Failed to clean up temporary repository.") @@ -31,13 +36,13 @@ func TestFetchChecksum(t *testing.T) { repoCfg := rpmmd.RepoConfig{ Name: "repo", - BaseURL: fmt.Sprintf("file://%s", dir), + BaseURL: "http://localhost:9000", IgnoreSSL: true, } // use a fullpath to dnf-json, this allows this test to have an arbitrary // working directory - rpmMetadata := rpmmd.NewRPMMD(path.Join(dir, "rpmmd"), "/usr/libexec/osbuild-composer/dnf-json") + rpmMetadata := rpmmd.NewRPMMD(path.Join(dir, "rpmmd")) _, c, err := rpmMetadata.FetchMetadata([]rpmmd.RepoConfig{repoCfg}, "platform:f31", "x86_64", "31") assert.Nilf(t, err, "Failed to fetch checksum: %v", err) assert.NotEqual(t, "", c["repo"], "The checksum is empty") @@ -64,7 +69,7 @@ func TestCrossArchDepsolve(t *testing.T) { // use a fullpath to dnf-json, this allows this test to have an arbitrary // working directory - rpm := rpmmd.NewRPMMD(dir, "/usr/libexec/osbuild-composer/dnf-json") + rpm := rpmmd.NewRPMMD(dir) repos, err := rpmmd.LoadRepositories([]string{repoDir}, distroStruct.Name()) require.NoErrorf(t, err, "Failed to LoadRepositories %v", distroStruct.Name()) diff --git a/cmd/osbuild-pipeline/main.go b/cmd/osbuild-pipeline/main.go index 7496028f8..d80493de2 100644 --- a/cmd/osbuild-pipeline/main.go +++ b/cmd/osbuild-pipeline/main.go @@ -118,7 +118,7 @@ func main() { panic("os.UserHomeDir(): " + err.Error()) } - rpm_md := rpmmd.NewRPMMD(path.Join(home, ".cache/osbuild-composer/rpmmd"), "/usr/libexec/osbuild-composer/dnf-json") + rpm_md := rpmmd.NewRPMMD(path.Join(home, ".cache/osbuild-composer/rpmmd")) packageSpecSets := make(map[string][]rpmmd.PackageSpec) for name, packages := range packageSets { diff --git a/cmd/osbuild-store-dump/main.go b/cmd/osbuild-store-dump/main.go index 4f2599268..bf23daece 100644 --- a/cmd/osbuild-store-dump/main.go +++ b/cmd/osbuild-store-dump/main.go @@ -138,7 +138,7 @@ func main() { if err != nil { panic("os.UserHomeDir(): " + err.Error()) } - rpmmd := rpmmd.NewRPMMD(path.Join(homeDir, ".cache/osbuild-composer/rpmmd"), "/usr/libexec/osbuild-composer/dnf-json") + rpmmd := rpmmd.NewRPMMD(path.Join(homeDir, ".cache/osbuild-composer/rpmmd")) s := store.New(&cwd, a, nil) if s == nil { diff --git a/cmd/osbuild-worker/jobimpl-depsolve.go b/cmd/osbuild-worker/jobimpl-depsolve.go index 091d7d4f2..c42272741 100644 --- a/cmd/osbuild-worker/jobimpl-depsolve.go +++ b/cmd/osbuild-worker/jobimpl-depsolve.go @@ -12,7 +12,7 @@ type DepsolveJobImpl struct { } func (impl *DepsolveJobImpl) depsolve(packageSets map[string]rpmmd.PackageSet, repos []rpmmd.RepoConfig, modulePlatformID, arch, releasever string) (map[string][]rpmmd.PackageSpec, error) { - rpmMD := rpmmd.NewRPMMD(impl.RPMMDCache, "/usr/libexec/osbuild-composer/dnf-json") + rpmMD := rpmmd.NewRPMMD(impl.RPMMDCache) packageSpecs := make(map[string][]rpmmd.PackageSpec) for name, packageSet := range packageSets { diff --git a/distribution/osbuild-composer.service b/distribution/osbuild-composer.service index a277c8604..9d88b0d81 100644 --- a/distribution/osbuild-composer.service +++ b/distribution/osbuild-composer.service @@ -1,5 +1,6 @@ [Unit] Description=OSBuild Composer +Requires=osbuild-dnf-json.socket After=network.target # Weldr API needs a local worker by default. diff --git a/distribution/osbuild-dnf-json.service b/distribution/osbuild-dnf-json.service new file mode 100644 index 000000000..1f012fb46 --- /dev/null +++ b/distribution/osbuild-dnf-json.service @@ -0,0 +1,13 @@ +[Unit] +Description=OSbuild dnf-json service +Requires=osbuild-dnf-json.socket + +[Service] +Type=simple +PrivateTmp=true +ExecStart=/usr/libexec/osbuild-composer/dnf-json +Restart=always +RestartSec=1s +CacheDirectory=osbuild-dnf-json +# systemd >= 240 sets this, but osbuild-worker runs on earlier versions +Environment="CACHE_DIRECTORY=/var/cache/osbuild-dnf-json" diff --git a/distribution/osbuild-dnf-json.socket b/distribution/osbuild-dnf-json.socket new file mode 100644 index 000000000..231f75e1a --- /dev/null +++ b/distribution/osbuild-dnf-json.socket @@ -0,0 +1,9 @@ +[Unit] +Description=OSbuild dnf-json socket + +[Socket] +Service=osbuild-dnf-json.service +ListenStream=/run/osbuild-dnf-json/api.sock + +[Install] +WantedBy=sockets.target diff --git a/distribution/osbuild-remote-worker@.service b/distribution/osbuild-remote-worker@.service index ab19e95c3..965c5b7ed 100644 --- a/distribution/osbuild-remote-worker@.service +++ b/distribution/osbuild-remote-worker@.service @@ -1,5 +1,6 @@ [Unit] Description=OSBuild Composer Remote Worker (%i) +Requires=osbuild-dnf-json.socket After=network.target [Service] diff --git a/distribution/osbuild-worker@.service b/distribution/osbuild-worker@.service index 4423085d8..335122304 100644 --- a/distribution/osbuild-worker@.service +++ b/distribution/osbuild-worker@.service @@ -1,6 +1,6 @@ [Unit] Description=OSBuild Composer Worker (%i) -Requires=osbuild-local-worker.socket +Requires=osbuild-local-worker.socket osbuild-dnf-json.socket After=network.target osbuild-local-worker.socket [Service] diff --git a/dnf-json b/dnf-json index cc7cfac3b..37411dfd4 100755 --- a/dnf-json +++ b/dnf-json @@ -1,21 +1,30 @@ #!/usr/bin/python3 import datetime -import dnf import hashlib -import hawkey import json -import sys import tempfile +import os +import socket +import socketserver +import logging +from http.server import BaseHTTPRequestHandler + +import dnf +import hawkey +from systemd.journal import JournalHandler + + +log = logging.getLogger('dnf-json') +log.addHandler(JournalHandler()) +log.setLevel(logging.INFO) DNF_ERROR_EXIT_CODE = 10 - -def timestamp_to_rfc3339(timestamp): +def timestamp_to_rfc3339(timestamp, package): d = datetime.datetime.utcfromtimestamp(package.buildtime) return d.strftime('%Y-%m-%dT%H:%M:%SZ') - def dnfrepo(desc, parent_conf=None): """Makes a dnf.repo.Repo out of a JSON repository description""" @@ -93,12 +102,6 @@ def create_base(repos, module_platform_id, persistdir, cachedir, arch): base.fill_sack(load_system_repo=False) return base - -def exit_with_dnf_error(kind: str, reason: str): - json.dump({"kind": kind, "reason": reason}, sys.stdout) - sys.exit(DNF_ERROR_EXIT_CODE) - - def repo_checksums(base): checksums = {} for repo in base.repos.iter_enabled(): @@ -123,98 +126,147 @@ def repo_checksums(base): return checksums +class DnfJsonRequestHandler(BaseHTTPRequestHandler): -call = json.load(sys.stdin) -command = call["command"] -arguments = call["arguments"] -repos = arguments.get("repos", {}) -arch = arguments["arch"] -cachedir = arguments["cachedir"] -module_platform_id = arguments["module_platform_id"] + def _send(self): + self.client_address=('',) -with tempfile.TemporaryDirectory() as persistdir: - try: - base = create_base( - repos, - module_platform_id, - persistdir, - cachedir, - arch - ) - except dnf.exceptions.Error as e: - exit_with_dnf_error( - type(e).__name__, - f"Error occurred when setting up repo: {e}" - ) + def exit_with_dnf_error(self, kind: str, reason: str): + self._send() + self.send_response(500) + self.send_header("Content-Type", "application/json") + self.end_headers() + self.wfile.write(json.dumps({"kind": kind, "reason": + reason}).encode("utf-8")) - if command == "dump": - packages = [] - for package in base.sack.query().available(): - packages.append({ - "name": package.name, - "summary": package.summary, - "description": package.description, - "url": package.url, - "epoch": package.epoch, - "version": package.version, - "release": package.release, - "arch": package.arch, - "buildtime": timestamp_to_rfc3339(package.buildtime), - "license": package.license - }) - json.dump({ - "checksums": repo_checksums(base), - "packages": packages - }, sys.stdout) + def exit_with_success(self, json_object): + self._send() + self.send_response(200) + self.send_header("Content-Type", "application/json") + self.end_headers() + self.wfile.write(json.dumps(json_object).encode("utf-8")) - elif command == "depsolve": - errors = [] + def do_POST(self): + content_len = int(self.headers.get('Content-Length')) + data = self.rfile.read(content_len) + call = json.loads(data.decode("utf-8")) + command = call["command"] + arguments = call["arguments"] + repos = arguments.get("repos", {}) + arch = arguments["arch"] + cachedir = arguments["cachedir"] + module_platform_id = arguments["module_platform_id"] - try: - base.install_specs( - arguments["package-specs"], - exclude=arguments.get("exclude-specs", []) - ) - except dnf.exceptions.MarkingErrors as e: - exit_with_dnf_error( - "MarkingErrors", - f"Error occurred when marking packages for installation: {e}" - ) - - try: - base.resolve() - except dnf.exceptions.DepsolveError as e: - exit_with_dnf_error( - "DepsolveError", - ( - "There was a problem depsolving " - f"{arguments['package-specs']}: {e}" + with tempfile.TemporaryDirectory() as persistdir: + try: + base = create_base( + repos, + module_platform_id, + persistdir, + cachedir, + arch ) - ) - - dependencies = [] - for tsi in base.transaction: - # Avoid using the install_set() helper, as it does not guarantee - # a stable order - if tsi.action not in dnf.transaction.FORWARD_ACTIONS: - continue - package = tsi.pkg - - dependencies.append({ - "name": package.name, - "epoch": package.epoch, - "version": package.version, - "release": package.release, - "arch": package.arch, - "repo_id": package.reponame, - "path": package.relativepath, - "remote_location": package.remote_location(), - "checksum": ( - f"{hawkey.chksum_name(package.chksum[0])}:" - f"{package.chksum[1].hex()}" + except dnf.exceptions.Error as e: + return self.exit_with_dnf_error( + type(e).__name__, + f"Error occurred when setting up repo: {e}" ) - }) - json.dump({ - "checksums": repo_checksums(base), - "dependencies": dependencies - }, sys.stdout) + + if command == "dump": + packages = [] + for package in base.sack.query().available(): + packages.append({ + "name": package.name, + "summary": package.summary, + "description": package.description, + "url": package.url, + "epoch": package.epoch, + "version": package.version, + "release": package.release, + "arch": package.arch, + "buildtime": timestamp_to_rfc3339(package.buildtime, package), + "license": package.license + }) + self.exit_with_success({ + "checksums": repo_checksums(base), + "packages": packages + }) + + elif command == "depsolve": + log.info("start depsolve") + errors = [] + + try: + base.install_specs( + arguments["package-specs"], + exclude=arguments.get("exclude-specs", []) + ) + except dnf.exceptions.MarkingErrors as e: + log.info("error install_specs") + return self.exit_with_dnf_error( + "MarkingErrors", + f"Error occurred when marking packages for installation: {e}" + ) + + try: + base.resolve() + except dnf.exceptions.DepsolveError as e: + log.info("error depsolve") + return self.exit_with_dnf_error( + "DepsolveError", + ( + "There was a problem depsolving " + f"{arguments['package-specs']}: {e}" + ) + ) + + dependencies = [] + for tsi in base.transaction: + # Avoid using the install_set() helper, as it does not guarantee + # a stable order + if tsi.action not in dnf.transaction.FORWARD_ACTIONS: + continue + package = tsi.pkg + + dependencies.append({ + "name": package.name, + "epoch": package.epoch, + "version": package.version, + "release": package.release, + "arch": package.arch, + "repo_id": package.reponame, + "path": package.relativepath, + "remote_location": package.remote_location(), + "checksum": ( + f"{hawkey.chksum_name(package.chksum[0])}:" + f"{package.chksum[1].hex()}" + ) + }) + self.exit_with_success({ + "checksums": repo_checksums(base), + "dependencies": dependencies + }) + +log.info("Starting the dnf-json server") + +SOCKET_FILE_PATH = "/run/osbuild-dnf-json/api.sock" + +LISTEN_FDS = int(os.environ.get("LISTEN_FDS", 0)) +LISTEN_PID = os.environ.get("LISTEN_PID", None) or os.getpid() + +log.debug("env %d and %d", LISTEN_FDS, LISTEN_PID) + +class SystemDActivationSocketServer(socketserver.UnixStreamServer): + def server_bind(self): + log.debug("service bind") + if LISTEN_FDS == 0: + log.debug("create new socket") + socketserver.UnixStreamServer.server_bind(self) + else: + log.debug("rebind socket") + log.debug("address_family: %d ", self.address_family) + log.debug("socket_type: %d ", self.socket_type) + self.socket = socket.fromfd(3, self.address_family, self.socket_type) + +server = SystemDActivationSocketServer(SOCKET_FILE_PATH, DnfJsonRequestHandler) +server.serve_forever() diff --git a/internal/distro/distro_test_common/distro_test_common.go b/internal/distro/distro_test_common/distro_test_common.go index 6a1437be4..457b0aac9 100644 --- a/internal/distro/distro_test_common/distro_test_common.go +++ b/internal/distro/distro_test_common/distro_test_common.go @@ -132,7 +132,7 @@ func TestDistro_Manifest(t *testing.T, pipelinePath string, prefix string, regis func getImageTypePkgSpecSets(imageType distro.ImageType, bp blueprint.Blueprint, repos []rpmmd.RepoConfig, cacheDir, dnfJsonPath string) map[string][]rpmmd.PackageSpec { imgPackageSets := imageType.PackageSets(bp) - rpm_md := rpmmd.NewRPMMD(cacheDir, dnfJsonPath) + rpm_md := rpmmd.NewRPMMD(cacheDir) imgPackageSpecSets := make(map[string][]rpmmd.PackageSpec) for name, packages := range imgPackageSets { diff --git a/internal/rpmmd/repository.go b/internal/rpmmd/repository.go index a338c01b3..1f87ac431 100644 --- a/internal/rpmmd/repository.go +++ b/internal/rpmmd/repository.go @@ -1,12 +1,15 @@ package rpmmd import ( + "bytes" + "context" "encoding/json" "fmt" "io/ioutil" "log" + "net" + "net/http" "os" - "os/exec" "path/filepath" "sort" "strconv" @@ -313,7 +316,7 @@ func LoadRepositories(confPaths []string, distro string) (map[string][]RepoConfi return repoConfigs, nil } -func runDNF(dnfJsonPath string, command string, arguments interface{}, result interface{}) error { +func runDNF(command string, arguments interface{}, result interface{}) error { var call = struct { Command string `json:"command"` Arguments interface{} `json:"arguments,omitempty"` @@ -322,39 +325,31 @@ func runDNF(dnfJsonPath string, command string, arguments interface{}, result in arguments, } - cmd := exec.Command(dnfJsonPath) + httpc := http.Client{ + Transport: &http.Transport{ + DialContext: func(_ context.Context, _, _ string) (net.Conn, error) { + return net.Dial("unix", "/run/osbuild-dnf-json/api.sock") + }, + }, + } - stdin, err := cmd.StdinPipe() + bpost, err := json.Marshal(call) if err != nil { return err } - cmd.Stderr = os.Stderr - stdout, err := cmd.StdoutPipe() + response, err := httpc.Post("http://unix", "application/json", bytes.NewReader(bpost)) + if err != nil { + return err + } + defer response.Body.Close() + + output, err := ioutil.ReadAll(response.Body) if err != nil { return err } - err = cmd.Start() - if err != nil { - return err - } - - err = json.NewEncoder(stdin).Encode(call) - if err != nil { - return err - } - stdin.Close() - - output, err := ioutil.ReadAll(stdout) - if err != nil { - return err - } - - err = cmd.Wait() - - const DnfErrorExitCode = 10 - if runError, ok := err.(*exec.ExitError); ok && runError.ExitCode() == DnfErrorExitCode { + if response.StatusCode != http.StatusOK { var dnfError DNFError err = json.Unmarshal(output, &dnfError) if err != nil { @@ -376,10 +371,9 @@ func runDNF(dnfJsonPath string, command string, arguments interface{}, result in type rpmmdImpl struct { CacheDir string subscriptions *rhsm.Subscriptions - dnfJsonPath string } -func NewRPMMD(cacheDir, dnfJsonPath string) RPMMD { +func NewRPMMD(cacheDir string) RPMMD { subscriptions, err := rhsm.LoadSystemSubscriptions() if err != nil { log.Println("Failed to load subscriptions. osbuild-composer will fail to build images if the "+ @@ -391,7 +385,6 @@ func NewRPMMD(cacheDir, dnfJsonPath string) RPMMD { return &rpmmdImpl{ CacheDir: cacheDir, subscriptions: subscriptions, - dnfJsonPath: dnfJsonPath, } } @@ -442,7 +435,7 @@ func (r *rpmmdImpl) FetchMetadata(repos []RepoConfig, modulePlatformID, arch, re Packages PackageList `json:"packages"` } - err := runDNF(r.dnfJsonPath, "dump", arguments, &reply) + err := runDNF("dump", arguments, &reply) sort.Slice(reply.Packages, func(i, j int) bool { return reply.Packages[i].Name < reply.Packages[j].Name @@ -477,7 +470,7 @@ func (r *rpmmdImpl) Depsolve(packageSet PackageSet, repos []RepoConfig, modulePl Checksums map[string]string `json:"checksums"` Dependencies []dnfPackageSpec `json:"dependencies"` } - err := runDNF(r.dnfJsonPath, "depsolve", arguments, &reply) + err := runDNF("depsolve", arguments, &reply) dependencies := make([]PackageSpec, len(reply.Dependencies)) for i, pack := range reply.Dependencies { diff --git a/osbuild-composer.spec b/osbuild-composer.spec index f34acce21..0ed4b71cf 100644 --- a/osbuild-composer.spec +++ b/osbuild-composer.spec @@ -375,6 +375,8 @@ The dnf-json binary used by osbuild-composer and the workers. %files dnf-json %{_libexecdir}/osbuild-composer/dnf-json +%{_unitdir}/osbuild-dnf-json.service +%{_unitdir}/osbuild-dnf-json.socket %if %{with tests} || 0%{?rhel}