From 9aef7bfc475af1af62bb45f952bcd278c72955b9 Mon Sep 17 00:00:00 2001 From: Achilleas Koutsou Date: Fri, 10 Sep 2021 17:13:03 +0200 Subject: [PATCH] osbuild-worker: attach pipeline names to jobs Pipeline names are added to each job before adding to the queue. When a job is finished, the names are copied to the Result object as well. This is done for both OSBuild and Koji jobs. The pipeline names in the result are primarily used to separate package lists into build and payload/image packages in two cases: 1. Koji builds: for reporting the build root and image package lists to Koji (in Koji finalize). 2. Cloud API (v1 and v2): for reporting the payload packages in the metadata request. The pipeline names are also used to print the system log output in the order in which pipelines are executed. This still isn't used when printing the OSBuild Result (osbuild2.Result.Write()) and we still rely on sorting by pipeline name (see https://github.com/osbuild/osbuild-composer/pull/1330). --- cmd/osbuild-worker/jobimpl-koji-finalize.go | 29 ++--- cmd/osbuild-worker/jobimpl-osbuild-koji.go | 3 + cmd/osbuild-worker/jobimpl-osbuild.go | 10 +- internal/cloudapi/v1/v1.go | 26 +++-- internal/cloudapi/v2/v2.go | 116 +++++++++----------- internal/kojiapi/server.go | 16 ++- internal/weldr/api.go | 4 + 7 files changed, 107 insertions(+), 97 deletions(-) diff --git a/cmd/osbuild-worker/jobimpl-koji-finalize.go b/cmd/osbuild-worker/jobimpl-koji-finalize.go index 42ad1fae7..9754cc5a8 100644 --- a/cmd/osbuild-worker/jobimpl-koji-finalize.go +++ b/cmd/osbuild-worker/jobimpl-koji-finalize.go @@ -8,7 +8,6 @@ import ( "net/url" "time" - osbuild "github.com/osbuild/osbuild-composer/internal/osbuild2" "github.com/osbuild/osbuild-composer/internal/rpmmd" "github.com/osbuild/osbuild-composer/internal/upload/koji" "github.com/osbuild/osbuild-composer/internal/worker" @@ -133,10 +132,14 @@ func (impl *KojiFinalizeJobImpl) Run(job worker.Job) error { var buildRoots []koji.BuildRoot var images []koji.Image for i, buildArgs := range osbuildKojiResults { - buildPipelineMd := buildArgs.OSBuildOutput.Metadata["build"] - buildRPMs := rpmmd.OSBuildMetadataToRPMs(buildPipelineMd) + buildRPMs := make([]rpmmd.RPM, 0) + // collect packages from stages in build pipelines + for _, plName := range buildArgs.PipelineNames.Build { + buildPipelineMd := buildArgs.OSBuildOutput.Metadata[plName] + buildRPMs = append(buildRPMs, rpmmd.OSBuildMetadataToRPMs(buildPipelineMd)...) + } // this dedupe is usually not necessary since we generally only have - // one rpm stage in the build pipeline, but it's not invalid to have + // one rpm stage in one build pipeline, but it's not invalid to have // multiple buildRPMs = rpmmd.DeduplicateRPMs(buildRPMs) buildRoots = append(buildRoots, koji.BuildRoot{ @@ -157,21 +160,13 @@ func (impl *KojiFinalizeJobImpl) Run(job worker.Job) error { RPMs: buildRPMs, }) - // collect metadata from all other pipelines - // use pipeline name + stage name as key while collecting since all RPM - // stage metadata will have the same key within a single pipeline - imagePipelinesMd := make(map[string]osbuild.StageMetadata) - for pipelineName, pipelineMetadata := range buildArgs.OSBuildOutput.Metadata { - if pipelineName == "build" { - continue - } - for stageName, stageMetadata := range pipelineMetadata { - imagePipelinesMd[pipelineName+":"+stageName] = stageMetadata - } + // collect packages from stages in payload pipelines + imageRPMs := make([]rpmmd.RPM, 0) + for _, plName := range buildArgs.PipelineNames.Payload { + payloadPipelineMd := buildArgs.OSBuildOutput.Metadata[plName] + imageRPMs = append(imageRPMs, rpmmd.OSBuildMetadataToRPMs(payloadPipelineMd)...) } - // collect packages from all stages - imageRPMs := rpmmd.OSBuildMetadataToRPMs(imagePipelinesMd) // deduplicate imageRPMs = rpmmd.DeduplicateRPMs(imageRPMs) diff --git a/cmd/osbuild-worker/jobimpl-osbuild-koji.go b/cmd/osbuild-worker/jobimpl-osbuild-koji.go index c3d50249e..46ce1016b 100644 --- a/cmd/osbuild-worker/jobimpl-osbuild-koji.go +++ b/cmd/osbuild-worker/jobimpl-osbuild-koji.go @@ -116,6 +116,9 @@ func (impl *OSBuildKojiJobImpl) Run(job worker.Job) error { } } + // copy pipeline info to the result + result.PipelineNames = args.PipelineNames + err = job.Update(&result) if err != nil { return fmt.Errorf("Error reporting job result: %v", err) diff --git a/cmd/osbuild-worker/jobimpl-osbuild.go b/cmd/osbuild-worker/jobimpl-osbuild.go index e3536173c..98b84aaf9 100644 --- a/cmd/osbuild-worker/jobimpl-osbuild.go +++ b/cmd/osbuild-worker/jobimpl-osbuild.go @@ -94,6 +94,8 @@ func (impl *OSBuildJobImpl) Run(job worker.Job) error { } args.Manifest = manifestJR.Manifest } + // copy pipeline info to the result + osbuildJobResult.PipelineNames = args.PipelineNames // The specification allows multiple upload targets because it is an array, but we don't support it. // Return an error to osbuild-composer. @@ -121,7 +123,13 @@ func (impl *OSBuildJobImpl) Run(job worker.Job) error { } // Include pipeline stages output inside the worker's logs. - for pipelineName, pipelineLog := range osbuildJobResult.OSBuildOutput.Log { + // Order pipelines based on PipelineNames from job + for _, pipelineName := range osbuildJobResult.PipelineNames.All() { + pipelineLog, hasLog := osbuildJobResult.OSBuildOutput.Log[pipelineName] + if !hasLog { + // no pipeline output + continue + } log.Printf("%s pipeline results:\n", pipelineName) for _, stageResult := range pipelineLog { if stageResult.Success { diff --git a/internal/cloudapi/v1/v1.go b/internal/cloudapi/v1/v1.go index 5a3e6fa3f..4e04d6033 100644 --- a/internal/cloudapi/v1/v1.go +++ b/internal/cloudapi/v1/v1.go @@ -107,9 +107,10 @@ func (h *apiHandlers) Compose(ctx echo.Context) error { // imagerequest type imageRequest struct { - manifest distro.Manifest - arch string - exports []string + manifest distro.Manifest + arch string + exports []string + pipelineNames worker.PipelineNames } imageRequests := make([]imageRequest, len(request.ImageRequests)) var targets []*target.Target @@ -243,6 +244,10 @@ func (h *apiHandlers) Compose(ctx echo.Context) error { imageRequests[i].manifest = manifest imageRequests[i].arch = arch.Name() imageRequests[i].exports = imageType.Exports() + imageRequests[i].pipelineNames = worker.PipelineNames{ + Build: imageType.BuildPipelines(), + Payload: imageType.PayloadPipelines(), + } uploadRequest := ir.UploadRequest /* oneOf is not supported by the openapi generator so marshal and unmarshal the uploadrequest based on the type */ @@ -377,9 +382,10 @@ func (h *apiHandlers) Compose(ctx echo.Context) error { } id, err := h.server.workers.EnqueueOSBuild(ir.arch, &worker.OSBuildJob{ - Manifest: ir.manifest, - Targets: targets, - Exports: ir.exports, + Manifest: ir.manifest, + Targets: targets, + Exports: ir.exports, + PipelineNames: &ir.pipelineNames, }) if err != nil { return echo.NewHTTPError(http.StatusInternalServerError, "Failed to enqueue manifest") @@ -527,10 +533,10 @@ func (h *apiHandlers) ComposeMetadata(ctx echo.Context, id string) error { } var ostreeCommitMetadata *osbuild.OSTreeCommitStageMetadata - var rpmStagesMd []osbuild.RPMStageMetadata // collect non-build rpm stage metadata - for plName, plMd := range result.OSBuildOutput.Metadata { - if plName == "build" { - // skip build pipeline + var rpmStagesMd []osbuild.RPMStageMetadata // collect rpm stage metadata from payload pipelines + for _, plName := range job.PipelineNames.Payload { + plMd, hasMd := result.OSBuildOutput.Metadata[plName] + if !hasMd { continue } for _, stageMd := range plMd { diff --git a/internal/cloudapi/v2/v2.go b/internal/cloudapi/v2/v2.go index 88a70a229..5ba97170f 100644 --- a/internal/cloudapi/v2/v2.go +++ b/internal/cloudapi/v2/v2.go @@ -10,7 +10,6 @@ import ( "math/big" "net/http" "strconv" - "strings" "time" "github.com/google/uuid" @@ -21,7 +20,7 @@ import ( "github.com/osbuild/osbuild-composer/internal/common" "github.com/osbuild/osbuild-composer/internal/distro" "github.com/osbuild/osbuild-composer/internal/distroregistry" - "github.com/osbuild/osbuild-composer/internal/osbuild1" + osbuild "github.com/osbuild/osbuild-composer/internal/osbuild2" "github.com/osbuild/osbuild-composer/internal/ostree" "github.com/osbuild/osbuild-composer/internal/prometheus" "github.com/osbuild/osbuild-composer/internal/rpmmd" @@ -151,10 +150,11 @@ func (h *apiHandlers) PostCompose(ctx echo.Context) error { } var imageRequest struct { - manifest distro.Manifest - arch string - exports []string - target *target.Target + manifest distro.Manifest + arch string + exports []string + target *target.Target + pipelineNames worker.PipelineNames } // use the same seed for all images so we get the same IDs @@ -287,6 +287,10 @@ func (h *apiHandlers) PostCompose(ctx echo.Context) error { imageRequest.manifest = manifest imageRequest.arch = arch.Name() imageRequest.exports = imageType.Exports() + imageRequest.pipelineNames = worker.PipelineNames{ + Build: imageType.BuildPipelines(), + Payload: imageType.PayloadPipelines(), + } /* oneOf is not supported by the openapi generator so marshal and unmarshal the uploadrequest based on the type */ switch ir.ImageType { @@ -403,9 +407,10 @@ func (h *apiHandlers) PostCompose(ctx echo.Context) error { } id, err := h.server.workers.EnqueueOSBuild(imageRequest.arch, &worker.OSBuildJob{ - Manifest: imageRequest.manifest, - Targets: []*target.Target{imageRequest.target}, - Exports: imageRequest.exports, + Manifest: imageRequest.manifest, + Targets: []*target.Target{imageRequest.target}, + Exports: imageRequest.exports, + PipelineNames: &imageRequest.pipelineNames, }) if err != nil { return HTTPErrorWithInternal(ErrorEnqueueingJob, err) @@ -575,63 +580,29 @@ func (h *apiHandlers) GetComposeMetadata(ctx echo.Context, id string) error { }) } - manifestVer, err := job.Manifest.Version() - if err != nil { - return HTTPError(ErrorFailedToParseManifestVersion) - } - - if result.OSBuildOutput == nil || result.OSBuildOutput.Assembler == nil { + if result.OSBuildOutput == nil || len(result.OSBuildOutput.Log) == 0 { + // no osbuild output recorded for job, error return HTTPError(ErrorMalformedOSBuildJobResult) } - var rpms []rpmmd.RPM - var ostreeCommitResult *osbuild1.StageResult - var coreStages []osbuild1.StageResult - switch manifestVer { - case "1": - coreStages = result.OSBuildOutput.Stages - if assemblerResult := result.OSBuildOutput.Assembler; assemblerResult.Name == "org.osbuild.ostree.commit" { - ostreeCommitResult = result.OSBuildOutput.Assembler + var ostreeCommitMetadata *osbuild.OSTreeCommitStageMetadata + var rpmStagesMd []osbuild.RPMStageMetadata // collect rpm stage metadata from payload pipelines + for _, plName := range job.PipelineNames.Payload { + plMd, hasMd := result.OSBuildOutput.Metadata[plName] + if !hasMd { + continue } - case "2": - // v2 manifest results store all stage output in the main stages - // here we filter out the build stages to collect only the RPMs for the - // core stages - // the filtering relies on two assumptions: - // 1. the build pipeline is named "build" - // 2. the stage results from v2 manifests when converted to v1 are - // named by prefixing the pipeline name - for _, stage := range result.OSBuildOutput.Stages { - if !strings.HasPrefix(stage.Name, "build") { - coreStages = append(coreStages, stage) + for _, stageMd := range plMd { + switch md := stageMd.(type) { + case *osbuild.RPMStageMetadata: + rpmStagesMd = append(rpmStagesMd, *md) + case *osbuild.OSTreeCommitStageMetadata: + ostreeCommitMetadata = md } } - // find the ostree.commit stage - for idx, stage := range result.OSBuildOutput.Stages { - if strings.HasSuffix(stage.Name, "org.osbuild.ostree.commit") { - ostreeCommitResult = &result.OSBuildOutput.Stages[idx] - break - } - } - default: - return HTTPError(ErrorUnknownManifestVersion) } - rpms = rpmmd.OSBuildStagesToRPMs(coreStages) - - packages := make([]PackageMetadata, len(rpms)) - for idx, rpm := range rpms { - packages[idx] = PackageMetadata{ - Type: rpm.Type, - Name: rpm.Name, - Version: rpm.Version, - Release: rpm.Release, - Epoch: rpm.Epoch, - Arch: rpm.Arch, - Sigmd5: rpm.Sigmd5, - Signature: rpm.Signature, - } - } + packages := stagesToPackageMetadata(rpmStagesMd) resp := &ComposeMetadata{ ObjectReference: ObjectReference{ @@ -642,13 +613,30 @@ func (h *apiHandlers) GetComposeMetadata(ctx echo.Context, id string) error { Packages: &packages, } - if ostreeCommitResult != nil && ostreeCommitResult.Metadata != nil { - commitMetadata, ok := ostreeCommitResult.Metadata.(*osbuild1.OSTreeCommitStageMetadata) - if !ok { - return HTTPError(ErrorUnableToConvertOSTreeCommitStageMetadata) - } - resp.OstreeCommit = &commitMetadata.Compose.OSTreeCommit + if ostreeCommitMetadata != nil { + resp.OstreeCommit = &ostreeCommitMetadata.Compose.OSTreeCommit } return ctx.JSON(200, resp) } + +func stagesToPackageMetadata(stages []osbuild.RPMStageMetadata) []PackageMetadata { + packages := make([]PackageMetadata, 0) + for _, md := range stages { + for _, rpm := range md.Packages { + packages = append(packages, + PackageMetadata{ + Type: "rpm", + Name: rpm.Name, + Version: rpm.Version, + Release: rpm.Release, + Epoch: rpm.Epoch, + Arch: rpm.Arch, + Sigmd5: rpm.SigMD5, + Signature: rpmmd.PackageMetadataToSignature(rpm), + }, + ) + } + } + return packages +} diff --git a/internal/kojiapi/server.go b/internal/kojiapi/server.go index 4874edf95..b26ccd8d8 100644 --- a/internal/kojiapi/server.go +++ b/internal/kojiapi/server.go @@ -82,10 +82,11 @@ func (h *apiHandlers) PostCompose(ctx echo.Context) error { } type imageRequest struct { - manifest distro.Manifest - arch string - filename string - exports []string + manifest distro.Manifest + arch string + filename string + exports []string + pipelineNames *worker.PipelineNames } imageRequests := make([]imageRequest, len(request.ImageRequests)) @@ -140,6 +141,10 @@ func (h *apiHandlers) PostCompose(ctx echo.Context) error { imageRequests[i].arch = arch.Name() imageRequests[i].filename = imageType.Filename() imageRequests[i].exports = imageType.Exports() + imageRequests[i].pipelineNames = &worker.PipelineNames{ + Build: imageType.BuildPipelines(), + Payload: imageType.PayloadPipelines(), + } kojiFilenames[i] = fmt.Sprintf( "%s-%s-%s.%s%s", @@ -168,6 +173,7 @@ func (h *apiHandlers) PostCompose(ctx echo.Context) error { Manifest: ir.manifest, ImageName: ir.filename, Exports: ir.exports, + PipelineNames: ir.pipelineNames, KojiServer: request.Koji.Server, KojiDirectory: kojiDirectory, KojiFilename: kojiFilenames[i], @@ -432,8 +438,8 @@ func (h *apiHandlers) getFinalizeJob(id uuid.UUID) (*worker.KojiFinalizeJob, []u } // getInitJob retrieves a KojiInitJob from the job queue given its ID. -// It returns an error if the ID matches a job of a different type. func (h *apiHandlers) getInitJob(id uuid.UUID) (*worker.KojiInitJob, error) { + // It returns an error if the ID matches a job of a different type. job := new(worker.KojiInitJob) jobType, _, _, err := h.server.workers.Job(id, job) if err != nil { diff --git a/internal/weldr/api.go b/internal/weldr/api.go index f89598b81..a9fc95aa5 100644 --- a/internal/weldr/api.go +++ b/internal/weldr/api.go @@ -2350,6 +2350,10 @@ func (api *API) composeHandler(writer http.ResponseWriter, request *http.Request ImageName: imageType.Filename(), StreamOptimized: imageType.Name() == "vmdk", // https://github.com/osbuild/osbuild/issues/528 Exports: imageType.Exports(), + PipelineNames: &worker.PipelineNames{ + Build: imageType.BuildPipelines(), + Payload: imageType.PayloadPipelines(), + }, }) if err == nil { err = api.store.PushCompose(composeID, manifest, imageType, bp, size, targets, jobId, packageSets["packages"])