diff --git a/internal/cloudapi/v2/handler.go b/internal/cloudapi/v2/handler.go index bdbbdad4b..cde88bfa6 100644 --- a/internal/cloudapi/v2/handler.go +++ b/internal/cloudapi/v2/handler.go @@ -2,7 +2,6 @@ package v2 import ( - "context" "crypto/rand" "encoding/json" "fmt" @@ -11,16 +10,13 @@ import ( "net/http" "strconv" "strings" - "time" "github.com/google/uuid" "github.com/labstack/echo/v4" - "github.com/sirupsen/logrus" "github.com/osbuild/osbuild-composer/internal/auth" "github.com/osbuild/osbuild-composer/internal/blueprint" "github.com/osbuild/osbuild-composer/internal/distro" - "github.com/osbuild/osbuild-composer/internal/jobqueue" osbuild "github.com/osbuild/osbuild-composer/internal/osbuild2" "github.com/osbuild/osbuild-composer/internal/ostree" "github.com/osbuild/osbuild-composer/internal/rpmmd" @@ -442,12 +438,12 @@ func (h *apiHandlers) PostCompose(ctx echo.Context) error { var id uuid.UUID if request.Koji != nil { - id, err = enqueueKojiCompose(h.server.workers, uint64(request.Koji.TaskId), request.Koji.Server, request.Koji.Name, request.Koji.Version, request.Koji.Release, distribution, bp, manifestSeed, irs, channel) + id, err = h.server.enqueueKojiCompose(uint64(request.Koji.TaskId), request.Koji.Server, request.Koji.Name, request.Koji.Version, request.Koji.Release, distribution, bp, manifestSeed, irs, channel) if err != nil { return err } } else { - id, err = enqueueCompose(h.server.workers, distribution, bp, manifestSeed, irs, channel) + id, err = h.server.enqueueCompose(distribution, bp, manifestSeed, irs, channel) if err != nil { return err } @@ -465,215 +461,6 @@ func (h *apiHandlers) PostCompose(ctx echo.Context) error { }) } -func enqueueCompose(workers *worker.Server, distribution distro.Distro, bp blueprint.Blueprint, manifestSeed int64, irs []imageRequest, channel string) (uuid.UUID, error) { - var id uuid.UUID - if len(irs) != 1 { - return id, HTTPError(ErrorInvalidNumberOfImageBuilds) - } - ir := irs[0] - - depsolveJobID, err := workers.EnqueueDepsolve(&worker.DepsolveJob{ - PackageSets: ir.imageType.PackageSets(bp), - Repos: ir.repositories, - ModulePlatformID: distribution.ModulePlatformID(), - Arch: ir.arch.Name(), - Releasever: distribution.Releasever(), - PackageSetsRepos: ir.packageSetsRepositories, - }, channel) - if err != nil { - return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err) - } - - manifestJobID, err := workers.EnqueueManifestJobByID(&worker.ManifestJobByID{}, depsolveJobID, channel) - if err != nil { - return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err) - } - - id, err = workers.EnqueueOSBuildAsDependency(ir.arch.Name(), &worker.OSBuildJob{ - Targets: []*target.Target{ir.target}, - Exports: ir.imageType.Exports(), - StreamOptimized: ir.imageType.Name() == "vmdk", // https://github.com/osbuild/osbuild/issues/528, - PipelineNames: &worker.PipelineNames{ - Build: ir.imageType.BuildPipelines(), - Payload: ir.imageType.PayloadPipelines(), - }, - }, manifestJobID, channel) - if err != nil { - return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err) - } - - ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5) - go generateManifest(ctx, cancel, workers, depsolveJobID, manifestJobID, ir.imageType, ir.repositories, ir.imageOptions, manifestSeed, bp.Customizations) - - return id, nil -} - -func enqueueKojiCompose(workers *worker.Server, taskID uint64, server, name, version, release string, distribution distro.Distro, bp blueprint.Blueprint, manifestSeed int64, irs []imageRequest, channel string) (uuid.UUID, error) { - var id uuid.UUID - kojiDirectory := "osbuild-composer-koji-" + uuid.New().String() - - initID, err := workers.EnqueueKojiInit(&worker.KojiInitJob{ - Server: server, - Name: name, - Version: version, - Release: release, - }, channel) - if err != nil { - return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err) - } - - var kojiFilenames []string - var buildIDs []uuid.UUID - for _, ir := range irs { - depsolveJobID, err := workers.EnqueueDepsolve(&worker.DepsolveJob{ - PackageSets: ir.imageType.PackageSets(bp), - Repos: ir.repositories, - ModulePlatformID: distribution.ModulePlatformID(), - Arch: ir.arch.Name(), - Releasever: distribution.Releasever(), - PackageSetsRepos: ir.packageSetsRepositories, - }, channel) - if err != nil { - return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err) - } - - manifestJobID, err := workers.EnqueueManifestJobByID(&worker.ManifestJobByID{}, depsolveJobID, channel) - if err != nil { - return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err) - } - kojiFilename := fmt.Sprintf( - "%s-%s-%s.%s%s", - name, - version, - release, - ir.arch.Name(), - splitExtension(ir.imageType.Filename()), - ) - buildID, err := workers.EnqueueOSBuildKojiAsDependency(ir.arch.Name(), &worker.OSBuildKojiJob{ - ImageName: ir.imageType.Filename(), - Exports: ir.imageType.Exports(), - PipelineNames: &worker.PipelineNames{ - Build: ir.imageType.BuildPipelines(), - Payload: ir.imageType.PayloadPipelines(), - }, - KojiServer: server, - KojiDirectory: kojiDirectory, - KojiFilename: kojiFilename, - }, manifestJobID, initID, channel) - if err != nil { - return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err) - } - kojiFilenames = append(kojiFilenames, kojiFilename) - buildIDs = append(buildIDs, buildID) - - ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5) - go generateManifest(ctx, cancel, workers, depsolveJobID, manifestJobID, ir.imageType, ir.repositories, ir.imageOptions, manifestSeed, bp.Customizations) - } - id, err = workers.EnqueueKojiFinalize(&worker.KojiFinalizeJob{ - Server: server, - Name: name, - Version: version, - Release: release, - KojiFilenames: kojiFilenames, - KojiDirectory: kojiDirectory, - TaskID: taskID, - StartTime: uint64(time.Now().Unix()), - }, initID, buildIDs, channel) - if err != nil { - return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err) - } - - return id, nil -} - -func generateManifest(ctx context.Context, cancel context.CancelFunc, workers *worker.Server, depsolveJobID uuid.UUID, manifestJobID uuid.UUID, imageType distro.ImageType, repos []rpmmd.RepoConfig, options distro.ImageOptions, seed int64, b *blueprint.Customizations) { - defer cancel() - - // wait until job is in a pending state - var token uuid.UUID - var dynArgs []json.RawMessage - var err error - logWithId := logrus.WithField("jobId", manifestJobID) - for { - _, token, _, _, dynArgs, err = workers.RequestJobById(ctx, "", manifestJobID) - if err == jobqueue.ErrNotPending { - logWithId.Debug("Manifest job not pending, waiting for depsolve job to finish") - time.Sleep(time.Millisecond * 50) - select { - case <-ctx.Done(): - logWithId.Warning("Manifest job dependencies took longer than 5 minutes to finish, returning to avoid dangling routines") - break - default: - continue - } - } - if err != nil { - logWithId.Errorf("Error requesting manifest job: %v", err) - return - } - break - } - - var jobResult *worker.ManifestJobByIDResult = &worker.ManifestJobByIDResult{ - Manifest: nil, - } - - defer func() { - if jobResult.JobError != nil { - logWithId.Errorf("Error in manifest job %v: %v", jobResult.JobError.Reason, err) - } - - result, err := json.Marshal(jobResult) - if err != nil { - logWithId.Errorf("Error marshalling manifest job results: %v", err) - } - - err = workers.FinishJob(token, result) - if err != nil { - logWithId.Errorf("Error finishing manifest job: %v", err) - } - }() - - if len(dynArgs) == 0 { - reason := "No dynamic arguments" - jobResult.JobError = clienterrors.WorkerClientError(clienterrors.ErrorNoDynamicArgs, reason) - return - } - - var depsolveResults worker.DepsolveJobResult - err = json.Unmarshal(dynArgs[0], &depsolveResults) - if err != nil { - reason := "Error parsing dynamic arguments" - jobResult.JobError = clienterrors.WorkerClientError(clienterrors.ErrorParsingDynamicArgs, reason) - return - } - - _, _, err = workers.DepsolveJobStatus(depsolveJobID, &depsolveResults) - if err != nil { - reason := "Error reading depsolve status" - jobResult.JobError = clienterrors.WorkerClientError(clienterrors.ErrorReadingJobStatus, reason) - return - } - - if jobErr := depsolveResults.JobError; jobErr != nil { - if jobErr.ID == clienterrors.ErrorDNFDepsolveError || jobErr.ID == clienterrors.ErrorDNFMarkingErrors { - jobResult.JobError = clienterrors.WorkerClientError(clienterrors.ErrorDepsolveDependency, "Error in depsolve job dependency input, bad package set requested") - return - } - jobResult.JobError = clienterrors.WorkerClientError(clienterrors.ErrorDepsolveDependency, "Error in depsolve job dependency") - return - } - - manifest, err := imageType.Manifest(b, options, repos, depsolveResults.PackageSpecs, seed) - if err != nil { - reason := "Error generating manifest" - jobResult.JobError = clienterrors.WorkerClientError(clienterrors.ErrorManifestGeneration, reason) - return - } - - jobResult.Manifest = manifest -} - func imageTypeFromApiImageType(it ImageTypes, arch distro.Arch) string { switch it { case ImageTypesAws: diff --git a/internal/cloudapi/v2/server.go b/internal/cloudapi/v2/server.go index 954a2cca2..cc8463aa2 100644 --- a/internal/cloudapi/v2/server.go +++ b/internal/cloudapi/v2/server.go @@ -1,15 +1,27 @@ package v2 import ( + "context" + "encoding/json" + "fmt" "net/http" + "time" + "github.com/google/uuid" "github.com/labstack/echo/v4" "github.com/labstack/echo/v4/middleware" + "github.com/sirupsen/logrus" + "github.com/osbuild/osbuild-composer/internal/blueprint" "github.com/osbuild/osbuild-composer/internal/common" + "github.com/osbuild/osbuild-composer/internal/distro" "github.com/osbuild/osbuild-composer/internal/distroregistry" + "github.com/osbuild/osbuild-composer/internal/jobqueue" "github.com/osbuild/osbuild-composer/internal/prometheus" + "github.com/osbuild/osbuild-composer/internal/rpmmd" + "github.com/osbuild/osbuild-composer/internal/target" "github.com/osbuild/osbuild-composer/internal/worker" + "github.com/osbuild/osbuild-composer/internal/worker/clienterrors" ) // Server represents the state of the cloud Server @@ -49,3 +61,212 @@ func (s *Server) Handler(path string) http.Handler { return e } + +func (s *Server) enqueueCompose(distribution distro.Distro, bp blueprint.Blueprint, manifestSeed int64, irs []imageRequest, channel string) (uuid.UUID, error) { + var id uuid.UUID + if len(irs) != 1 { + return id, HTTPError(ErrorInvalidNumberOfImageBuilds) + } + ir := irs[0] + + depsolveJobID, err := s.workers.EnqueueDepsolve(&worker.DepsolveJob{ + PackageSets: ir.imageType.PackageSets(bp), + Repos: ir.repositories, + ModulePlatformID: distribution.ModulePlatformID(), + Arch: ir.arch.Name(), + Releasever: distribution.Releasever(), + PackageSetsRepos: ir.packageSetsRepositories, + }, channel) + if err != nil { + return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err) + } + + manifestJobID, err := s.workers.EnqueueManifestJobByID(&worker.ManifestJobByID{}, depsolveJobID, channel) + if err != nil { + return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err) + } + + id, err = s.workers.EnqueueOSBuildAsDependency(ir.arch.Name(), &worker.OSBuildJob{ + Targets: []*target.Target{ir.target}, + Exports: ir.imageType.Exports(), + StreamOptimized: ir.imageType.Name() == "vmdk", // https://github.com/osbuild/osbuild/issues/528, + PipelineNames: &worker.PipelineNames{ + Build: ir.imageType.BuildPipelines(), + Payload: ir.imageType.PayloadPipelines(), + }, + }, manifestJobID, channel) + if err != nil { + return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err) + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5) + go generateManifest(ctx, cancel, s.workers, depsolveJobID, manifestJobID, ir.imageType, ir.repositories, ir.imageOptions, manifestSeed, bp.Customizations) + + return id, nil +} + +func (s *Server) enqueueKojiCompose(taskID uint64, server, name, version, release string, distribution distro.Distro, bp blueprint.Blueprint, manifestSeed int64, irs []imageRequest, channel string) (uuid.UUID, error) { + var id uuid.UUID + kojiDirectory := "osbuild-composer-koji-" + uuid.New().String() + + initID, err := s.workers.EnqueueKojiInit(&worker.KojiInitJob{ + Server: server, + Name: name, + Version: version, + Release: release, + }, channel) + if err != nil { + return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err) + } + + var kojiFilenames []string + var buildIDs []uuid.UUID + for _, ir := range irs { + depsolveJobID, err := s.workers.EnqueueDepsolve(&worker.DepsolveJob{ + PackageSets: ir.imageType.PackageSets(bp), + Repos: ir.repositories, + ModulePlatformID: distribution.ModulePlatformID(), + Arch: ir.arch.Name(), + Releasever: distribution.Releasever(), + PackageSetsRepos: ir.packageSetsRepositories, + }, channel) + if err != nil { + return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err) + } + + manifestJobID, err := s.workers.EnqueueManifestJobByID(&worker.ManifestJobByID{}, depsolveJobID, channel) + if err != nil { + return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err) + } + kojiFilename := fmt.Sprintf( + "%s-%s-%s.%s%s", + name, + version, + release, + ir.arch.Name(), + splitExtension(ir.imageType.Filename()), + ) + buildID, err := s.workers.EnqueueOSBuildKojiAsDependency(ir.arch.Name(), &worker.OSBuildKojiJob{ + ImageName: ir.imageType.Filename(), + Exports: ir.imageType.Exports(), + PipelineNames: &worker.PipelineNames{ + Build: ir.imageType.BuildPipelines(), + Payload: ir.imageType.PayloadPipelines(), + }, + KojiServer: server, + KojiDirectory: kojiDirectory, + KojiFilename: kojiFilename, + }, manifestJobID, initID, channel) + if err != nil { + return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err) + } + kojiFilenames = append(kojiFilenames, kojiFilename) + buildIDs = append(buildIDs, buildID) + + ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5) + go generateManifest(ctx, cancel, s.workers, depsolveJobID, manifestJobID, ir.imageType, ir.repositories, ir.imageOptions, manifestSeed, bp.Customizations) + } + id, err = s.workers.EnqueueKojiFinalize(&worker.KojiFinalizeJob{ + Server: server, + Name: name, + Version: version, + Release: release, + KojiFilenames: kojiFilenames, + KojiDirectory: kojiDirectory, + TaskID: taskID, + StartTime: uint64(time.Now().Unix()), + }, initID, buildIDs, channel) + if err != nil { + return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err) + } + + return id, nil +} + +func generateManifest(ctx context.Context, cancel context.CancelFunc, workers *worker.Server, depsolveJobID uuid.UUID, manifestJobID uuid.UUID, imageType distro.ImageType, repos []rpmmd.RepoConfig, options distro.ImageOptions, seed int64, b *blueprint.Customizations) { + defer cancel() + + // wait until job is in a pending state + var token uuid.UUID + var dynArgs []json.RawMessage + var err error + logWithId := logrus.WithField("jobId", manifestJobID) + for { + _, token, _, _, dynArgs, err = workers.RequestJobById(ctx, "", manifestJobID) + if err == jobqueue.ErrNotPending { + logWithId.Debug("Manifest job not pending, waiting for depsolve job to finish") + time.Sleep(time.Millisecond * 50) + select { + case <-ctx.Done(): + logWithId.Warning("Manifest job dependencies took longer than 5 minutes to finish, returning to avoid dangling routines") + break + default: + continue + } + } + if err != nil { + logWithId.Errorf("Error requesting manifest job: %v", err) + return + } + break + } + + var jobResult *worker.ManifestJobByIDResult = &worker.ManifestJobByIDResult{ + Manifest: nil, + } + + defer func() { + if jobResult.JobError != nil { + logWithId.Errorf("Error in manifest job %v: %v", jobResult.JobError.Reason, err) + } + + result, err := json.Marshal(jobResult) + if err != nil { + logWithId.Errorf("Error marshalling manifest job results: %v", err) + } + + err = workers.FinishJob(token, result) + if err != nil { + logWithId.Errorf("Error finishing manifest job: %v", err) + } + }() + + if len(dynArgs) == 0 { + reason := "No dynamic arguments" + jobResult.JobError = clienterrors.WorkerClientError(clienterrors.ErrorNoDynamicArgs, reason) + return + } + + var depsolveResults worker.DepsolveJobResult + err = json.Unmarshal(dynArgs[0], &depsolveResults) + if err != nil { + reason := "Error parsing dynamic arguments" + jobResult.JobError = clienterrors.WorkerClientError(clienterrors.ErrorParsingDynamicArgs, reason) + return + } + + _, _, err = workers.DepsolveJobStatus(depsolveJobID, &depsolveResults) + if err != nil { + reason := "Error reading depsolve status" + jobResult.JobError = clienterrors.WorkerClientError(clienterrors.ErrorReadingJobStatus, reason) + return + } + + if jobErr := depsolveResults.JobError; jobErr != nil { + if jobErr.ID == clienterrors.ErrorDNFDepsolveError || jobErr.ID == clienterrors.ErrorDNFMarkingErrors { + jobResult.JobError = clienterrors.WorkerClientError(clienterrors.ErrorDepsolveDependency, "Error in depsolve job dependency input, bad package set requested") + return + } + jobResult.JobError = clienterrors.WorkerClientError(clienterrors.ErrorDepsolveDependency, "Error in depsolve job dependency") + return + } + + manifest, err := imageType.Manifest(b, options, repos, depsolveResults.PackageSpecs, seed) + if err != nil { + reason := "Error generating manifest" + jobResult.JobError = clienterrors.WorkerClientError(clienterrors.ErrorManifestGeneration, reason) + return + } + + jobResult.Manifest = manifest +}