osbuild-worker: attach pipeline names to jobs

Pipeline names are added to each job before adding to the queue. When a
job is finished, the names are copied to the Result object as well. This
is done for both OSBuild and Koji jobs.

The pipeline names in the result are primarily used to separate package
lists into build and payload/image packages in two cases:
1. Koji builds: for reporting the build root and image package lists to
   Koji (in Koji finalize).
2. Cloud API (v1 and v2): for reporting the payload packages in the
   metadata request.

The pipeline names are also used to print the system log output in the
order in which pipelines are executed. This still isn't used when
printing the OSBuild Result (osbuild2.Result.Write()) and we still rely
on sorting by pipeline name
(see https://github.com/osbuild/osbuild-composer/pull/1330).
This commit is contained in:
Achilleas Koutsou 2021-09-10 17:13:03 +02:00 committed by Ondřej Budai
parent 143eb5cb91
commit 9aef7bfc47
7 changed files with 107 additions and 97 deletions

View file

@ -8,7 +8,6 @@ import (
"net/url" "net/url"
"time" "time"
osbuild "github.com/osbuild/osbuild-composer/internal/osbuild2"
"github.com/osbuild/osbuild-composer/internal/rpmmd" "github.com/osbuild/osbuild-composer/internal/rpmmd"
"github.com/osbuild/osbuild-composer/internal/upload/koji" "github.com/osbuild/osbuild-composer/internal/upload/koji"
"github.com/osbuild/osbuild-composer/internal/worker" "github.com/osbuild/osbuild-composer/internal/worker"
@ -133,10 +132,14 @@ func (impl *KojiFinalizeJobImpl) Run(job worker.Job) error {
var buildRoots []koji.BuildRoot var buildRoots []koji.BuildRoot
var images []koji.Image var images []koji.Image
for i, buildArgs := range osbuildKojiResults { for i, buildArgs := range osbuildKojiResults {
buildPipelineMd := buildArgs.OSBuildOutput.Metadata["build"] buildRPMs := make([]rpmmd.RPM, 0)
buildRPMs := rpmmd.OSBuildMetadataToRPMs(buildPipelineMd) // collect packages from stages in build pipelines
for _, plName := range buildArgs.PipelineNames.Build {
buildPipelineMd := buildArgs.OSBuildOutput.Metadata[plName]
buildRPMs = append(buildRPMs, rpmmd.OSBuildMetadataToRPMs(buildPipelineMd)...)
}
// this dedupe is usually not necessary since we generally only have // this dedupe is usually not necessary since we generally only have
// one rpm stage in the build pipeline, but it's not invalid to have // one rpm stage in one build pipeline, but it's not invalid to have
// multiple // multiple
buildRPMs = rpmmd.DeduplicateRPMs(buildRPMs) buildRPMs = rpmmd.DeduplicateRPMs(buildRPMs)
buildRoots = append(buildRoots, koji.BuildRoot{ buildRoots = append(buildRoots, koji.BuildRoot{
@ -157,21 +160,13 @@ func (impl *KojiFinalizeJobImpl) Run(job worker.Job) error {
RPMs: buildRPMs, RPMs: buildRPMs,
}) })
// collect metadata from all other pipelines // collect packages from stages in payload pipelines
// use pipeline name + stage name as key while collecting since all RPM imageRPMs := make([]rpmmd.RPM, 0)
// stage metadata will have the same key within a single pipeline for _, plName := range buildArgs.PipelineNames.Payload {
imagePipelinesMd := make(map[string]osbuild.StageMetadata) payloadPipelineMd := buildArgs.OSBuildOutput.Metadata[plName]
for pipelineName, pipelineMetadata := range buildArgs.OSBuildOutput.Metadata { imageRPMs = append(imageRPMs, rpmmd.OSBuildMetadataToRPMs(payloadPipelineMd)...)
if pipelineName == "build" {
continue
}
for stageName, stageMetadata := range pipelineMetadata {
imagePipelinesMd[pipelineName+":"+stageName] = stageMetadata
}
} }
// collect packages from all stages
imageRPMs := rpmmd.OSBuildMetadataToRPMs(imagePipelinesMd)
// deduplicate // deduplicate
imageRPMs = rpmmd.DeduplicateRPMs(imageRPMs) imageRPMs = rpmmd.DeduplicateRPMs(imageRPMs)

View file

@ -116,6 +116,9 @@ func (impl *OSBuildKojiJobImpl) Run(job worker.Job) error {
} }
} }
// copy pipeline info to the result
result.PipelineNames = args.PipelineNames
err = job.Update(&result) err = job.Update(&result)
if err != nil { if err != nil {
return fmt.Errorf("Error reporting job result: %v", err) return fmt.Errorf("Error reporting job result: %v", err)

View file

@ -94,6 +94,8 @@ func (impl *OSBuildJobImpl) Run(job worker.Job) error {
} }
args.Manifest = manifestJR.Manifest args.Manifest = manifestJR.Manifest
} }
// copy pipeline info to the result
osbuildJobResult.PipelineNames = args.PipelineNames
// The specification allows multiple upload targets because it is an array, but we don't support it. // The specification allows multiple upload targets because it is an array, but we don't support it.
// Return an error to osbuild-composer. // Return an error to osbuild-composer.
@ -121,7 +123,13 @@ func (impl *OSBuildJobImpl) Run(job worker.Job) error {
} }
// Include pipeline stages output inside the worker's logs. // Include pipeline stages output inside the worker's logs.
for pipelineName, pipelineLog := range osbuildJobResult.OSBuildOutput.Log { // Order pipelines based on PipelineNames from job
for _, pipelineName := range osbuildJobResult.PipelineNames.All() {
pipelineLog, hasLog := osbuildJobResult.OSBuildOutput.Log[pipelineName]
if !hasLog {
// no pipeline output
continue
}
log.Printf("%s pipeline results:\n", pipelineName) log.Printf("%s pipeline results:\n", pipelineName)
for _, stageResult := range pipelineLog { for _, stageResult := range pipelineLog {
if stageResult.Success { if stageResult.Success {

View file

@ -107,9 +107,10 @@ func (h *apiHandlers) Compose(ctx echo.Context) error {
// imagerequest // imagerequest
type imageRequest struct { type imageRequest struct {
manifest distro.Manifest manifest distro.Manifest
arch string arch string
exports []string exports []string
pipelineNames worker.PipelineNames
} }
imageRequests := make([]imageRequest, len(request.ImageRequests)) imageRequests := make([]imageRequest, len(request.ImageRequests))
var targets []*target.Target var targets []*target.Target
@ -243,6 +244,10 @@ func (h *apiHandlers) Compose(ctx echo.Context) error {
imageRequests[i].manifest = manifest imageRequests[i].manifest = manifest
imageRequests[i].arch = arch.Name() imageRequests[i].arch = arch.Name()
imageRequests[i].exports = imageType.Exports() imageRequests[i].exports = imageType.Exports()
imageRequests[i].pipelineNames = worker.PipelineNames{
Build: imageType.BuildPipelines(),
Payload: imageType.PayloadPipelines(),
}
uploadRequest := ir.UploadRequest uploadRequest := ir.UploadRequest
/* oneOf is not supported by the openapi generator so marshal and unmarshal the uploadrequest based on the type */ /* oneOf is not supported by the openapi generator so marshal and unmarshal the uploadrequest based on the type */
@ -377,9 +382,10 @@ func (h *apiHandlers) Compose(ctx echo.Context) error {
} }
id, err := h.server.workers.EnqueueOSBuild(ir.arch, &worker.OSBuildJob{ id, err := h.server.workers.EnqueueOSBuild(ir.arch, &worker.OSBuildJob{
Manifest: ir.manifest, Manifest: ir.manifest,
Targets: targets, Targets: targets,
Exports: ir.exports, Exports: ir.exports,
PipelineNames: &ir.pipelineNames,
}) })
if err != nil { if err != nil {
return echo.NewHTTPError(http.StatusInternalServerError, "Failed to enqueue manifest") return echo.NewHTTPError(http.StatusInternalServerError, "Failed to enqueue manifest")
@ -527,10 +533,10 @@ func (h *apiHandlers) ComposeMetadata(ctx echo.Context, id string) error {
} }
var ostreeCommitMetadata *osbuild.OSTreeCommitStageMetadata var ostreeCommitMetadata *osbuild.OSTreeCommitStageMetadata
var rpmStagesMd []osbuild.RPMStageMetadata // collect non-build rpm stage metadata var rpmStagesMd []osbuild.RPMStageMetadata // collect rpm stage metadata from payload pipelines
for plName, plMd := range result.OSBuildOutput.Metadata { for _, plName := range job.PipelineNames.Payload {
if plName == "build" { plMd, hasMd := result.OSBuildOutput.Metadata[plName]
// skip build pipeline if !hasMd {
continue continue
} }
for _, stageMd := range plMd { for _, stageMd := range plMd {

View file

@ -10,7 +10,6 @@ import (
"math/big" "math/big"
"net/http" "net/http"
"strconv" "strconv"
"strings"
"time" "time"
"github.com/google/uuid" "github.com/google/uuid"
@ -21,7 +20,7 @@ import (
"github.com/osbuild/osbuild-composer/internal/common" "github.com/osbuild/osbuild-composer/internal/common"
"github.com/osbuild/osbuild-composer/internal/distro" "github.com/osbuild/osbuild-composer/internal/distro"
"github.com/osbuild/osbuild-composer/internal/distroregistry" "github.com/osbuild/osbuild-composer/internal/distroregistry"
"github.com/osbuild/osbuild-composer/internal/osbuild1" osbuild "github.com/osbuild/osbuild-composer/internal/osbuild2"
"github.com/osbuild/osbuild-composer/internal/ostree" "github.com/osbuild/osbuild-composer/internal/ostree"
"github.com/osbuild/osbuild-composer/internal/prometheus" "github.com/osbuild/osbuild-composer/internal/prometheus"
"github.com/osbuild/osbuild-composer/internal/rpmmd" "github.com/osbuild/osbuild-composer/internal/rpmmd"
@ -151,10 +150,11 @@ func (h *apiHandlers) PostCompose(ctx echo.Context) error {
} }
var imageRequest struct { var imageRequest struct {
manifest distro.Manifest manifest distro.Manifest
arch string arch string
exports []string exports []string
target *target.Target target *target.Target
pipelineNames worker.PipelineNames
} }
// use the same seed for all images so we get the same IDs // use the same seed for all images so we get the same IDs
@ -287,6 +287,10 @@ func (h *apiHandlers) PostCompose(ctx echo.Context) error {
imageRequest.manifest = manifest imageRequest.manifest = manifest
imageRequest.arch = arch.Name() imageRequest.arch = arch.Name()
imageRequest.exports = imageType.Exports() imageRequest.exports = imageType.Exports()
imageRequest.pipelineNames = worker.PipelineNames{
Build: imageType.BuildPipelines(),
Payload: imageType.PayloadPipelines(),
}
/* oneOf is not supported by the openapi generator so marshal and unmarshal the uploadrequest based on the type */ /* oneOf is not supported by the openapi generator so marshal and unmarshal the uploadrequest based on the type */
switch ir.ImageType { switch ir.ImageType {
@ -403,9 +407,10 @@ func (h *apiHandlers) PostCompose(ctx echo.Context) error {
} }
id, err := h.server.workers.EnqueueOSBuild(imageRequest.arch, &worker.OSBuildJob{ id, err := h.server.workers.EnqueueOSBuild(imageRequest.arch, &worker.OSBuildJob{
Manifest: imageRequest.manifest, Manifest: imageRequest.manifest,
Targets: []*target.Target{imageRequest.target}, Targets: []*target.Target{imageRequest.target},
Exports: imageRequest.exports, Exports: imageRequest.exports,
PipelineNames: &imageRequest.pipelineNames,
}) })
if err != nil { if err != nil {
return HTTPErrorWithInternal(ErrorEnqueueingJob, err) return HTTPErrorWithInternal(ErrorEnqueueingJob, err)
@ -575,63 +580,29 @@ func (h *apiHandlers) GetComposeMetadata(ctx echo.Context, id string) error {
}) })
} }
manifestVer, err := job.Manifest.Version() if result.OSBuildOutput == nil || len(result.OSBuildOutput.Log) == 0 {
if err != nil { // no osbuild output recorded for job, error
return HTTPError(ErrorFailedToParseManifestVersion)
}
if result.OSBuildOutput == nil || result.OSBuildOutput.Assembler == nil {
return HTTPError(ErrorMalformedOSBuildJobResult) return HTTPError(ErrorMalformedOSBuildJobResult)
} }
var rpms []rpmmd.RPM var ostreeCommitMetadata *osbuild.OSTreeCommitStageMetadata
var ostreeCommitResult *osbuild1.StageResult var rpmStagesMd []osbuild.RPMStageMetadata // collect rpm stage metadata from payload pipelines
var coreStages []osbuild1.StageResult for _, plName := range job.PipelineNames.Payload {
switch manifestVer { plMd, hasMd := result.OSBuildOutput.Metadata[plName]
case "1": if !hasMd {
coreStages = result.OSBuildOutput.Stages continue
if assemblerResult := result.OSBuildOutput.Assembler; assemblerResult.Name == "org.osbuild.ostree.commit" {
ostreeCommitResult = result.OSBuildOutput.Assembler
} }
case "2": for _, stageMd := range plMd {
// v2 manifest results store all stage output in the main stages switch md := stageMd.(type) {
// here we filter out the build stages to collect only the RPMs for the case *osbuild.RPMStageMetadata:
// core stages rpmStagesMd = append(rpmStagesMd, *md)
// the filtering relies on two assumptions: case *osbuild.OSTreeCommitStageMetadata:
// 1. the build pipeline is named "build" ostreeCommitMetadata = md
// 2. the stage results from v2 manifests when converted to v1 are
// named by prefixing the pipeline name
for _, stage := range result.OSBuildOutput.Stages {
if !strings.HasPrefix(stage.Name, "build") {
coreStages = append(coreStages, stage)
} }
} }
// find the ostree.commit stage
for idx, stage := range result.OSBuildOutput.Stages {
if strings.HasSuffix(stage.Name, "org.osbuild.ostree.commit") {
ostreeCommitResult = &result.OSBuildOutput.Stages[idx]
break
}
}
default:
return HTTPError(ErrorUnknownManifestVersion)
} }
rpms = rpmmd.OSBuildStagesToRPMs(coreStages) packages := stagesToPackageMetadata(rpmStagesMd)
packages := make([]PackageMetadata, len(rpms))
for idx, rpm := range rpms {
packages[idx] = PackageMetadata{
Type: rpm.Type,
Name: rpm.Name,
Version: rpm.Version,
Release: rpm.Release,
Epoch: rpm.Epoch,
Arch: rpm.Arch,
Sigmd5: rpm.Sigmd5,
Signature: rpm.Signature,
}
}
resp := &ComposeMetadata{ resp := &ComposeMetadata{
ObjectReference: ObjectReference{ ObjectReference: ObjectReference{
@ -642,13 +613,30 @@ func (h *apiHandlers) GetComposeMetadata(ctx echo.Context, id string) error {
Packages: &packages, Packages: &packages,
} }
if ostreeCommitResult != nil && ostreeCommitResult.Metadata != nil { if ostreeCommitMetadata != nil {
commitMetadata, ok := ostreeCommitResult.Metadata.(*osbuild1.OSTreeCommitStageMetadata) resp.OstreeCommit = &ostreeCommitMetadata.Compose.OSTreeCommit
if !ok {
return HTTPError(ErrorUnableToConvertOSTreeCommitStageMetadata)
}
resp.OstreeCommit = &commitMetadata.Compose.OSTreeCommit
} }
return ctx.JSON(200, resp) return ctx.JSON(200, resp)
} }
func stagesToPackageMetadata(stages []osbuild.RPMStageMetadata) []PackageMetadata {
packages := make([]PackageMetadata, 0)
for _, md := range stages {
for _, rpm := range md.Packages {
packages = append(packages,
PackageMetadata{
Type: "rpm",
Name: rpm.Name,
Version: rpm.Version,
Release: rpm.Release,
Epoch: rpm.Epoch,
Arch: rpm.Arch,
Sigmd5: rpm.SigMD5,
Signature: rpmmd.PackageMetadataToSignature(rpm),
},
)
}
}
return packages
}

View file

@ -82,10 +82,11 @@ func (h *apiHandlers) PostCompose(ctx echo.Context) error {
} }
type imageRequest struct { type imageRequest struct {
manifest distro.Manifest manifest distro.Manifest
arch string arch string
filename string filename string
exports []string exports []string
pipelineNames *worker.PipelineNames
} }
imageRequests := make([]imageRequest, len(request.ImageRequests)) imageRequests := make([]imageRequest, len(request.ImageRequests))
@ -140,6 +141,10 @@ func (h *apiHandlers) PostCompose(ctx echo.Context) error {
imageRequests[i].arch = arch.Name() imageRequests[i].arch = arch.Name()
imageRequests[i].filename = imageType.Filename() imageRequests[i].filename = imageType.Filename()
imageRequests[i].exports = imageType.Exports() imageRequests[i].exports = imageType.Exports()
imageRequests[i].pipelineNames = &worker.PipelineNames{
Build: imageType.BuildPipelines(),
Payload: imageType.PayloadPipelines(),
}
kojiFilenames[i] = fmt.Sprintf( kojiFilenames[i] = fmt.Sprintf(
"%s-%s-%s.%s%s", "%s-%s-%s.%s%s",
@ -168,6 +173,7 @@ func (h *apiHandlers) PostCompose(ctx echo.Context) error {
Manifest: ir.manifest, Manifest: ir.manifest,
ImageName: ir.filename, ImageName: ir.filename,
Exports: ir.exports, Exports: ir.exports,
PipelineNames: ir.pipelineNames,
KojiServer: request.Koji.Server, KojiServer: request.Koji.Server,
KojiDirectory: kojiDirectory, KojiDirectory: kojiDirectory,
KojiFilename: kojiFilenames[i], KojiFilename: kojiFilenames[i],
@ -432,8 +438,8 @@ func (h *apiHandlers) getFinalizeJob(id uuid.UUID) (*worker.KojiFinalizeJob, []u
} }
// getInitJob retrieves a KojiInitJob from the job queue given its ID. // getInitJob retrieves a KojiInitJob from the job queue given its ID.
// It returns an error if the ID matches a job of a different type.
func (h *apiHandlers) getInitJob(id uuid.UUID) (*worker.KojiInitJob, error) { func (h *apiHandlers) getInitJob(id uuid.UUID) (*worker.KojiInitJob, error) {
// It returns an error if the ID matches a job of a different type.
job := new(worker.KojiInitJob) job := new(worker.KojiInitJob)
jobType, _, _, err := h.server.workers.Job(id, job) jobType, _, _, err := h.server.workers.Job(id, job)
if err != nil { if err != nil {

View file

@ -2350,6 +2350,10 @@ func (api *API) composeHandler(writer http.ResponseWriter, request *http.Request
ImageName: imageType.Filename(), ImageName: imageType.Filename(),
StreamOptimized: imageType.Name() == "vmdk", // https://github.com/osbuild/osbuild/issues/528 StreamOptimized: imageType.Name() == "vmdk", // https://github.com/osbuild/osbuild/issues/528
Exports: imageType.Exports(), Exports: imageType.Exports(),
PipelineNames: &worker.PipelineNames{
Build: imageType.BuildPipelines(),
Payload: imageType.PayloadPipelines(),
},
}) })
if err == nil { if err == nil {
err = api.store.PushCompose(composeID, manifest, imageType, bp, size, targets, jobId, packageSets["packages"]) err = api.store.PushCompose(composeID, manifest, imageType, bp, size, targets, jobId, packageSets["packages"])