cloudapi/v2: Use manifest-id-only job

job dependencies:
depsolve -> manifest -> osbuild

This allows the compose handler to return the osbuild job id
immediately.
This commit is contained in:
sanne 2021-11-15 10:58:52 +01:00 committed by Ondřej Budai
parent ac39dacfae
commit 028eca1b26
4 changed files with 105 additions and 65 deletions

View file

@ -2,9 +2,9 @@
package v2
import (
"context"
"crypto/rand"
"encoding/json"
"errors"
"fmt"
"math"
"math/big"
@ -15,11 +15,13 @@ import (
"github.com/google/uuid"
"github.com/labstack/echo/v4"
"github.com/labstack/echo/v4/middleware"
"github.com/sirupsen/logrus"
"github.com/osbuild/osbuild-composer/internal/blueprint"
"github.com/osbuild/osbuild-composer/internal/common"
"github.com/osbuild/osbuild-composer/internal/distro"
"github.com/osbuild/osbuild-composer/internal/distroregistry"
"github.com/osbuild/osbuild-composer/internal/jobqueue"
osbuild "github.com/osbuild/osbuild-composer/internal/osbuild2"
"github.com/osbuild/osbuild-composer/internal/ostree"
"github.com/osbuild/osbuild-composer/internal/prometheus"
@ -149,14 +151,6 @@ func (h *apiHandlers) PostCompose(ctx echo.Context) error {
}
}
var imageRequest struct {
manifest distro.Manifest
arch string
exports []string
target *target.Target
pipelineNames worker.PipelineNames
}
// use the same seed for all images so we get the same IDs
bigSeed, err := rand.Int(rand.Reader, big.NewInt(math.MaxInt64))
if err != nil {
@ -200,30 +194,6 @@ func (h *apiHandlers) PostCompose(ctx echo.Context) error {
return HTTPErrorWithInternal(ErrorEnqueueingJob, err)
}
var depsolveResults worker.DepsolveJobResult
for {
status, _, err := h.server.workers.JobStatus(depsolveJobID, &depsolveResults)
if err != nil {
return HTTPErrorWithInternal(ErrorGettingDepsolveJobStatus, err)
}
if status.Canceled {
return HTTPErrorWithInternal(ErrorDepsolveJobCanceled, err)
}
if !status.Finished.IsZero() {
break
}
time.Sleep(50 * time.Millisecond)
}
if depsolveResults.Error != "" {
if depsolveResults.ErrorType == worker.DepsolveErrorType {
return HTTPError(ErrorDNFError)
}
return HTTPErrorWithInternal(ErrorFailedToDepsolve, errors.New(depsolveResults.Error))
}
pkgSpecSets := depsolveResults.PackageSpecs
imageOptions := distro.ImageOptions{Size: imageType.Size(0)}
if request.Customizations != nil && request.Customizations.Subscription != nil {
imageOptions.Subscription = &distro.SubscriptionImageOptions{
@ -279,19 +249,7 @@ func (h *apiHandlers) PostCompose(ctx echo.Context) error {
}
}
manifest, err := imageType.Manifest(blueprintCustoms, imageOptions, repositories, pkgSpecSets, manifestSeed)
if err != nil {
return HTTPErrorWithInternal(ErrorFailedToMakeManifest, err)
}
imageRequest.manifest = manifest
imageRequest.arch = arch.Name()
imageRequest.exports = imageType.Exports()
imageRequest.pipelineNames = worker.PipelineNames{
Build: imageType.BuildPipelines(),
Payload: imageType.PayloadPipelines(),
}
var irTarget *target.Target
/* oneOf is not supported by the openapi generator so marshal and unmarshal the uploadrequest based on the type */
switch ir.ImageType {
case ImageTypes_aws:
@ -319,7 +277,7 @@ func (h *apiHandlers) PostCompose(ctx echo.Context) error {
t.ImageName = key
}
imageRequest.target = t
irTarget = t
case ImageTypes_edge_installer:
fallthrough
case ImageTypes_edge_commit:
@ -342,7 +300,7 @@ func (h *apiHandlers) PostCompose(ctx echo.Context) error {
})
t.ImageName = key
imageRequest.target = t
irTarget = t
case ImageTypes_gcp:
var gcpUploadOptions GCPUploadOptions
jsonUploadOptions, err := json.Marshal(ir.UploadOptions)
@ -375,7 +333,7 @@ func (h *apiHandlers) PostCompose(ctx echo.Context) error {
t.ImageName = object
}
imageRequest.target = t
irTarget = t
case ImageTypes_azure:
var azureUploadOptions AzureUploadOptions
jsonUploadOptions, err := json.Marshal(ir.UploadOptions)
@ -401,23 +359,97 @@ func (h *apiHandlers) PostCompose(ctx echo.Context) error {
t.ImageName = fmt.Sprintf("composer-api-%s", uuid.New().String())
}
imageRequest.target = t
irTarget = t
default:
return HTTPError(ErrorUnsupportedImageType)
}
id, err := h.server.workers.EnqueueOSBuild(imageRequest.arch, &worker.OSBuildJob{
Manifest: imageRequest.manifest,
Targets: []*target.Target{imageRequest.target},
Exports: imageRequest.exports,
PipelineNames: &imageRequest.pipelineNames,
})
manifestJobID, err := h.server.workers.EnqueueManifestJobByID(&worker.ManifestJobByID{}, depsolveJobID)
if err != nil {
return HTTPErrorWithInternal(ErrorEnqueueingJob, err)
}
id, err := h.server.workers.EnqueueOSBuildAsDependency(arch.Name(), &worker.OSBuildJob{
Targets: []*target.Target{irTarget},
Exports: imageType.Exports(),
}, manifestJobID)
if err != nil {
return HTTPErrorWithInternal(ErrorEnqueueingJob, err)
}
ctx.Logger().Infof("Job ID %s enqueued for operationID %s", id, ctx.Get("operationID"))
// start 1 goroutine which requests datajob type
go func(workers *worker.Server, manifestJobID uuid.UUID, b *blueprint.Customizations, options distro.ImageOptions, repos []rpmmd.RepoConfig, seed int64) {
// wait until job is in a pending state
var token uuid.UUID
var dynArgs []json.RawMessage
var err error
for {
_, token, _, _, dynArgs, err = workers.RequestJobById(context.Background(), "", manifestJobID)
if err == jobqueue.ErrNotPending {
logrus.Debugf("Manifest job %v not pending, waiting for depsolve job to finish", manifestJobID)
time.Sleep(time.Millisecond * 50)
continue
}
if err != nil {
logrus.Errorf("Error requesting manifest job: %v", err)
return
}
break
}
var jobResult *worker.ManifestJobByIDResult = &worker.ManifestJobByIDResult{
Manifest: nil,
Error: "",
}
defer func() {
if jobResult.Error != "" {
logrus.Errorf("Error in manifest job %v: %v", manifestJobID, jobResult.Error)
}
result, err := json.Marshal(jobResult)
if err != nil {
logrus.Errorf("Error marshalling manifest job %v results: %v", manifestJobID, err)
}
err = workers.FinishJob(token, result)
if err != nil {
logrus.Errorf("Error finishing manifest job: %v", err)
}
}()
if len(dynArgs) == 0 {
jobResult.Error = "No dynamic arguments"
return
}
var depsolveResults worker.DepsolveJobResult
err = json.Unmarshal(dynArgs[0], &depsolveResults)
if err != nil {
jobResult.Error = "Error parsing dynamic arguments"
return
}
if depsolveResults.Error != "" {
if depsolveResults.ErrorType == worker.DepsolveErrorType {
jobResult.Error = "Error in depsolve job dependency input, bad request"
return
}
jobResult.Error = "Error in depsolve job dependency"
return
}
manifest, err := imageType.Manifest(b, options, repos, depsolveResults.PackageSpecs, seed)
if err != nil {
jobResult.Error = "Error generating manifest"
return
}
jobResult.Manifest = manifest
}(h.server.workers, manifestJobID, blueprintCustoms, imageOptions, repositories, manifestSeed)
return ctx.JSON(http.StatusCreated, &ComposeId{
ObjectReference: ObjectReference{
Href: "/api/image-builder-composer/v2/compose",

View file

@ -15,6 +15,7 @@ import (
"github.com/osbuild/osbuild-composer/internal/distro/test_distro"
distro_mock "github.com/osbuild/osbuild-composer/internal/mocks/distro"
rpmmd_mock "github.com/osbuild/osbuild-composer/internal/mocks/rpmmd"
"github.com/osbuild/osbuild-composer/internal/rpmmd"
"github.com/osbuild/osbuild-composer/internal/test"
"github.com/osbuild/osbuild-composer/internal/worker"
)
@ -39,9 +40,12 @@ func newV2Server(t *testing.T, dir string) (*v2.Server, *worker.Server, context.
if err != nil {
continue
}
rawMsg, err := json.Marshal(&worker.DepsolveJobResult{PackageSpecs: nil, Error: "", ErrorType: worker.ErrorType("")})
rawMsg, err := json.Marshal(&worker.DepsolveJobResult{PackageSpecs: map[string][]rpmmd.PackageSpec{"build": []rpmmd.PackageSpec{rpmmd.PackageSpec{Name: "pkg1"}}}, Error: "", ErrorType: worker.ErrorType("")})
require.NoError(t, err)
require.NoError(t, rpmFixture.Workers.FinishJob(token, rawMsg))
err = rpmFixture.Workers.FinishJob(token, rawMsg)
if err != nil {
return
}
select {
case <-depsolveContext.Done():
@ -272,10 +276,16 @@ func TestComposeStatusSuccess(t *testing.T) {
"kind": "ComposeId"
}`, "id")
jobId, token, jobType, _, _, err := wrksrv.RequestJob(context.Background(), test_distro.TestArch3Name, []string{"osbuild"})
jobId, token, jobType, args, dynArgs, err := wrksrv.RequestJob(context.Background(), test_distro.TestArch3Name, []string{"osbuild"})
require.NoError(t, err)
require.Equal(t, "osbuild", jobType)
var osbuildJob worker.OSBuildJob
err = json.Unmarshal(args, &osbuildJob)
require.NoError(t, err)
require.Equal(t, 0, len(osbuildJob.Manifest))
require.NotEqual(t, 0, len(dynArgs[0]))
test.TestRoute(t, srv.Handler("/api/image-builder-composer/v2"), false, "GET", fmt.Sprintf("/api/image-builder-composer/v2/composes/%v", jobId), ``, http.StatusOK, fmt.Sprintf(`
{
"href": "/api/image-builder-composer/v2/composes/%v",
@ -284,7 +294,6 @@ func TestComposeStatusSuccess(t *testing.T) {
"image_status": {"status": "building"}
}`, jobId, jobId))
// todo make it an osbuildjobresult
res, err := json.Marshal(&worker.OSBuildJobResult{
Success: true,
})
@ -308,7 +317,6 @@ func TestComposeStatusSuccess(t *testing.T) {
"code": "IMAGE-BUILDER-COMPOSER-1012",
"reason": "OSBuildJobResult does not have expected fields set"
}`, "operation_id")
}
func TestComposeStatusFailure(t *testing.T) {

View file

@ -14,7 +14,7 @@ import (
//
type OSBuildJob struct {
Manifest distro.Manifest `json:"manifest"`
Manifest distro.Manifest `json:"manifest,omitempty"`
Targets []*target.Target `json:"targets,omitempty"`
ImageName string `json:"image_name,omitempty"`
StreamOptimized bool `json:"stream_optimized,omitempty"`