From e904397fdb44edc5cf8119e7e946f96b6a3db38f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20Budai?= Date: Fri, 10 Sep 2021 18:39:17 +0200 Subject: [PATCH] cloudapi/v2: Use worker to depsolve MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Ondřej Budai --- cmd/osbuild-worker/jobimpl-depsolve.go | 6 ++++ internal/cloudapi/v2/errors.go | 4 +++ internal/cloudapi/v2/v2.go | 44 +++++++++++++++++------ internal/cloudapi/v2/v2_test.go | 49 +++++++++++++++++++++----- internal/worker/json.go | 8 +++++ internal/worker/server.go | 4 +++ 6 files changed, 96 insertions(+), 19 deletions(-) diff --git a/cmd/osbuild-worker/jobimpl-depsolve.go b/cmd/osbuild-worker/jobimpl-depsolve.go index 74d58eb7e..639548065 100644 --- a/cmd/osbuild-worker/jobimpl-depsolve.go +++ b/cmd/osbuild-worker/jobimpl-depsolve.go @@ -33,6 +33,12 @@ func (impl *DepsolveJobImpl) Run(job worker.Job) error { var result worker.DepsolveJobResult result.PackageSpecs, err = impl.depsolve(args.PackageSets, args.Repos, args.ModulePlatformID, args.Arch, args.Releasever) if err != nil { + switch err.(type) { + case *rpmmd.DNFError: + result.ErrorType = worker.DepsolveErrorType + case error: + result.ErrorType = worker.OtherErrorType + } result.Error = err.Error() } diff --git a/internal/cloudapi/v2/errors.go b/internal/cloudapi/v2/errors.go index 70b4d629b..5a3c7f460 100644 --- a/internal/cloudapi/v2/errors.go +++ b/internal/cloudapi/v2/errors.go @@ -50,6 +50,8 @@ const ( ErrorUnknownManifestVersion ServiceErrorCode = 1010 ErrorUnableToConvertOSTreeCommitStageMetadata ServiceErrorCode = 1011 ErrorMalformedOSBuildJobResult ServiceErrorCode = 1012 + ErrorGettingDepsolveJobStatus ServiceErrorCode = 1013 + ErrorDepsolveJobCanceled ServiceErrorCode = 1014 // Errors contained within this file ErrorUnspecified ServiceErrorCode = 10000 @@ -107,6 +109,8 @@ func getServiceErrors() serviceErrors { serviceError{ErrorUnknownManifestVersion, http.StatusInternalServerError, "Unknown manifest version"}, serviceError{ErrorUnableToConvertOSTreeCommitStageMetadata, http.StatusInternalServerError, "Unable to convert ostree commit stage metadata"}, serviceError{ErrorMalformedOSBuildJobResult, http.StatusInternalServerError, "OSBuildJobResult does not have expected fields set"}, + serviceError{ErrorGettingDepsolveJobStatus, http.StatusInternalServerError, "Unable to get depsolve job status"}, + serviceError{ErrorDepsolveJobCanceled, http.StatusInternalServerError, "Depsolve job was cancelled"}, serviceError{ErrorUnspecified, http.StatusInternalServerError, "Unspecified internal error "}, serviceError{ErrorNotHTTPError, http.StatusInternalServerError, "Error is not an instance of HTTPError"}, diff --git a/internal/cloudapi/v2/v2.go b/internal/cloudapi/v2/v2.go index 12025b530..75bf804e0 100644 --- a/internal/cloudapi/v2/v2.go +++ b/internal/cloudapi/v2/v2.go @@ -11,6 +11,7 @@ import ( "net/http" "strconv" "strings" + "time" "github.com/google/uuid" "github.com/labstack/echo/v4" @@ -199,18 +200,41 @@ func (h *apiHandlers) PostCompose(ctx echo.Context) error { } packageSets := imageType.PackageSets(bp) - pkgSpecSets := make(map[string][]rpmmd.PackageSpec) - for name, packages := range packageSets { - pkgs, _, err := h.server.rpmMetadata.Depsolve(packages, repositories, distribution.ModulePlatformID(), arch.Name(), distribution.Releasever()) - var dnfError *rpmmd.DNFError - if err != nil && errors.As(err, &dnfError) { - return HTTPError(ErrorDNFError) - } else if err != nil { - return HTTPError(ErrorFailedToDepsolve) - } - pkgSpecSets[name] = pkgs + depsolveJobID, err := h.server.workers.EnqueueDepsolve(&worker.DepsolveJob{ + PackageSets: packageSets, + Repos: repositories, + ModulePlatformID: distribution.ModulePlatformID(), + Arch: arch.Name(), + Releasever: distribution.Releasever(), + }) + if err != nil { + return HTTPErrorWithInternal(ErrorEnqueueingJob, err) } + var depsolveResults worker.DepsolveJobResult + for { + status, _, err := h.server.workers.JobStatus(depsolveJobID, &depsolveResults) + if err != nil { + return HTTPErrorWithInternal(ErrorGettingDepsolveJobStatus, err) + } + if status.Canceled { + return HTTPErrorWithInternal(ErrorDepsolveJobCanceled, err) + } + if !status.Finished.IsZero() { + break + } + time.Sleep(50 * time.Millisecond) + } + + if depsolveResults.Error != "" { + if depsolveResults.ErrorType == worker.DepsolveErrorType { + return HTTPError(ErrorDNFError) + } + return HTTPErrorWithInternal(ErrorFailedToDepsolve, errors.New(depsolveResults.Error)) + } + + pkgSpecSets := depsolveResults.PackageSpecs + imageOptions := distro.ImageOptions{Size: imageType.Size(0)} if request.Customizations != nil && request.Customizations.Subscription != nil { imageOptions.Subscription = &distro.SubscriptionImageOptions{ diff --git a/internal/cloudapi/v2/v2_test.go b/internal/cloudapi/v2/v2_test.go index f76100695..dc7f71b1b 100644 --- a/internal/cloudapi/v2/v2_test.go +++ b/internal/cloudapi/v2/v2_test.go @@ -8,6 +8,7 @@ import ( "net/http" "os" "testing" + "time" "github.com/stretchr/testify/require" @@ -19,7 +20,7 @@ import ( "github.com/osbuild/osbuild-composer/internal/worker" ) -func newV2Server(t *testing.T, dir string) (*v2.Server, *worker.Server) { +func newV2Server(t *testing.T, dir string) (*v2.Server, *worker.Server, context.CancelFunc) { rpmFixture := rpmmd_mock.BaseFixture(dir) rpm := rpmmd_mock.NewRPMMDMock(rpmFixture) require.NotNil(t, rpm) @@ -31,14 +32,38 @@ func newV2Server(t *testing.T, dir string) (*v2.Server, *worker.Server) { v2Server := v2.NewServer(rpmFixture.Workers, rpm, distros, "image-builder.service") require.NotNil(t, v2Server) - return v2Server, rpmFixture.Workers + // start a routine which just completes depsolve jobs + depsolveContext, cancel := context.WithCancel(context.Background()) + go func() { + for { + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(50*time.Millisecond)) + defer cancel() + _, token, _, _, _, err := rpmFixture.Workers.RequestJob(ctx, test_distro.TestDistroName, []string{"depsolve"}) + if err != nil { + continue + } + rawMsg, err := json.Marshal(&worker.DepsolveJobResult{PackageSpecs: nil, Error: "", ErrorType: worker.ErrorType("")}) + require.NoError(t, err) + require.NoError(t, rpmFixture.Workers.FinishJob(token, rawMsg)) + + select { + case <-depsolveContext.Done(): + return + default: + continue + } + } + }() + + return v2Server, rpmFixture.Workers, cancel } func TestUnknownRoute(t *testing.T) { dir, err := ioutil.TempDir("", "osbuild-composer-test-api-v2-") require.NoError(t, err) defer os.RemoveAll(dir) - srv, _ := newV2Server(t, dir) + srv, _, cancel := newV2Server(t, dir) + defer cancel() test.TestRoute(t, srv.Handler("/api/image-builder-composer/v2"), false, "GET", "/api/image-builder-composer/v2/badroute", ``, http.StatusNotFound, ` { @@ -54,7 +79,8 @@ func TestGetError(t *testing.T) { dir, err := ioutil.TempDir("", "osbuild-composer-test-api-v2-") require.NoError(t, err) defer os.RemoveAll(dir) - srv, _ := newV2Server(t, dir) + srv, _, cancel := newV2Server(t, dir) + defer cancel() test.TestRoute(t, srv.Handler("/api/image-builder-composer/v2"), false, "GET", "/api/image-builder-composer/v2/errors/4", ``, http.StatusOK, ` { @@ -79,7 +105,8 @@ func TestGetErrorList(t *testing.T) { dir, err := ioutil.TempDir("", "osbuild-composer-test-api-v2-") require.NoError(t, err) defer os.RemoveAll(dir) - srv, _ := newV2Server(t, dir) + srv, _, cancel := newV2Server(t, dir) + defer cancel() test.TestRoute(t, srv.Handler("/api/image-builder-composer/v2"), false, "GET", "/api/image-builder-composer/v2/errors?page=3&size=1", ``, http.StatusOK, ` { @@ -100,7 +127,8 @@ func TestCompose(t *testing.T) { dir, err := ioutil.TempDir("", "osbuild-composer-test-api-v2-") require.NoError(t, err) defer os.RemoveAll(dir) - srv, _ := newV2Server(t, dir) + srv, _, cancel := newV2Server(t, dir) + defer cancel() // unsupported distribution test.TestRoute(t, srv.Handler("/api/image-builder-composer/v2"), false, "POST", "/api/image-builder-composer/v2/compose", fmt.Sprintf(` @@ -224,7 +252,8 @@ func TestComposeStatusSuccess(t *testing.T) { dir, err := ioutil.TempDir("", "osbuild-composer-test-api-v2-") require.NoError(t, err) defer os.RemoveAll(dir) - srv, wrksrv := newV2Server(t, dir) + srv, wrksrv, cancel := newV2Server(t, dir) + defer cancel() test.TestRoute(t, srv.Handler("/api/image-builder-composer/v2"), false, "POST", "/api/image-builder-composer/v2/compose", fmt.Sprintf(` { @@ -289,7 +318,8 @@ func TestComposeStatusFailure(t *testing.T) { dir, err := ioutil.TempDir("", "osbuild-composer-test-api-v2-") require.NoError(t, err) defer os.RemoveAll(dir) - srv, wrksrv := newV2Server(t, dir) + srv, wrksrv, cancel := newV2Server(t, dir) + defer cancel() test.TestRoute(t, srv.Handler("/api/image-builder-composer/v2"), false, "POST", "/api/image-builder-composer/v2/compose", fmt.Sprintf(` { @@ -338,7 +368,8 @@ func TestComposeCustomizations(t *testing.T) { dir, err := ioutil.TempDir("", "osbuild-composer-test-api-v2-") require.NoError(t, err) defer os.RemoveAll(dir) - srv, _ := newV2Server(t, dir) + srv, _, cancel := newV2Server(t, dir) + defer cancel() test.TestRoute(t, srv.Handler("/api/image-builder-composer/v2"), false, "POST", "/api/image-builder-composer/v2/compose", fmt.Sprintf(` { diff --git a/internal/worker/json.go b/internal/worker/json.go index 718a555a3..cd4459fc0 100644 --- a/internal/worker/json.go +++ b/internal/worker/json.go @@ -81,9 +81,17 @@ type DepsolveJob struct { Releasever string `json:"releasever"` } +type ErrorType string + +const ( + DepsolveErrorType ErrorType = "depsolve" + OtherErrorType ErrorType = "other" +) + type DepsolveJobResult struct { PackageSpecs map[string][]rpmmd.PackageSpec `json:"package_specs"` Error string `json:"error"` + ErrorType ErrorType `json:"error_type"` } // diff --git a/internal/worker/server.go b/internal/worker/server.go index cc1004213..92c0ad61e 100644 --- a/internal/worker/server.go +++ b/internal/worker/server.go @@ -104,6 +104,10 @@ func (s *Server) EnqueueKojiFinalize(job *KojiFinalizeJob, initID uuid.UUID, bui return s.jobs.Enqueue("koji-finalize", job, append([]uuid.UUID{initID}, buildIDs...)) } +func (s *Server) EnqueueDepsolve(job *DepsolveJob) (uuid.UUID, error) { + return s.jobs.Enqueue("depsolve", job, nil) +} + func (s *Server) JobStatus(id uuid.UUID, result interface{}) (*JobStatus, []uuid.UUID, error) { rawResult, queued, started, finished, canceled, deps, err := s.jobs.JobStatus(id) if err != nil {