debian-forge-composer/internal/cloudapi/v2/server.go
Achilleas Koutsou d7a5bb99c1 ostree: replace FetchChecksum with ParentRef on ostree.ImageOptions
The FetchChecksum on ostree.ImageOptions was the resolved commit ID of
the parent ref to be pulled (for ostree commits and containers) or the
commit ID of the content ref (for ostree installers and raw images).
With the new process of manifest creation and serialisation, using the
image options to transport resolved content references is bad and
confusing.  Image options should only reflect user and image type
options before any references are resolved.  With this change, the
ostree.ImageOptions should only reflect the ostree-related options
specified by the user.  Commit IDs will only be available after the
manifest is initialised when the commit sources are resolved (before
serialisation).
2023-06-14 11:19:29 +02:00

514 lines
16 KiB
Go

package v2
import (
"context"
"encoding/json"
"fmt"
"net/http"
"sync"
"time"
"github.com/getkin/kin-openapi/openapi3"
"github.com/getkin/kin-openapi/routers"
legacyrouter "github.com/getkin/kin-openapi/routers/legacy"
"github.com/google/uuid"
"github.com/labstack/echo/v4"
"github.com/labstack/echo/v4/middleware"
"github.com/sirupsen/logrus"
"github.com/osbuild/osbuild-composer/pkg/jobqueue"
"github.com/osbuild/osbuild-composer/internal/blueprint"
"github.com/osbuild/osbuild-composer/internal/common"
"github.com/osbuild/osbuild-composer/internal/container"
"github.com/osbuild/osbuild-composer/internal/distro"
"github.com/osbuild/osbuild-composer/internal/distroregistry"
"github.com/osbuild/osbuild-composer/internal/ostree"
"github.com/osbuild/osbuild-composer/internal/prometheus"
"github.com/osbuild/osbuild-composer/internal/rpmmd"
"github.com/osbuild/osbuild-composer/internal/target"
"github.com/osbuild/osbuild-composer/internal/worker"
"github.com/osbuild/osbuild-composer/internal/worker/clienterrors"
)
// Server represents the state of the cloud Server
type Server struct {
workers *worker.Server
distros *distroregistry.Registry
config ServerConfig
router routers.Router
goroutinesCtx context.Context
goroutinesCtxCancel context.CancelFunc
goroutinesGroup sync.WaitGroup
}
type ServerConfig struct {
TenantProviderFields []string
JWTEnabled bool
}
func NewServer(workers *worker.Server, distros *distroregistry.Registry, config ServerConfig) *Server {
ctx, cancel := context.WithCancel(context.Background())
spec, err := GetSwagger()
if err != nil {
panic(err)
}
loader := openapi3.NewLoader()
if err := spec.Validate(loader.Context); err != nil {
panic(err)
}
router, err := legacyrouter.NewRouter(spec)
if err != nil {
panic(err)
}
server := &Server{
workers: workers,
distros: distros,
config: config,
router: router,
goroutinesCtx: ctx,
goroutinesCtxCancel: cancel,
}
return server
}
func (s *Server) Handler(path string) http.Handler {
e := echo.New()
e.Binder = binder{}
e.HTTPErrorHandler = s.HTTPErrorHandler
e.Pre(common.OperationIDMiddleware)
e.Use(middleware.Recover())
e.Logger = common.Logger()
handler := apiHandlers{
server: s,
}
statusMW := prometheus.StatusMiddleware(prometheus.ComposerSubsystem)
RegisterHandlers(e.Group(path, prometheus.MetricsMiddleware, s.ValidateRequest, statusMW), &handler)
return e
}
func (s *Server) Shutdown() {
s.goroutinesCtxCancel()
s.goroutinesGroup.Wait()
}
func (s *Server) enqueueCompose(distribution distro.Distro, bp blueprint.Blueprint, manifestSeed int64, irs []imageRequest, channel string) (uuid.UUID, error) {
var id uuid.UUID
if len(irs) != 1 {
return id, HTTPError(ErrorInvalidNumberOfImageBuilds)
}
ir := irs[0]
// NOTE(akoutsou): Image options don't have resolved ostree ref yet, but it
// will affect package sets if we don't add it and it's required. This
// used to be done in the old PackageSets() function (which no longer
// exists), but now we only need it in the cloud API where things aren't
// done in the (new) correct order yet.
ir.imageOptions.OSTree = &ostree.ImageOptions{
ImageRef: ir.imageType.OSTreeRef(),
}
manifestSource, _, err := ir.imageType.Manifest(&bp, ir.imageOptions, ir.repositories, manifestSeed)
if err != nil {
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err)
}
depsolveJobID, err := s.workers.EnqueueDepsolve(&worker.DepsolveJob{
PackageSets: manifestSource.Content.PackageSets,
ModulePlatformID: distribution.ModulePlatformID(),
Arch: ir.arch.Name(),
Releasever: distribution.Releasever(),
}, channel)
if err != nil {
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err)
}
dependencies := []uuid.UUID{depsolveJobID}
var containerResolveJob uuid.UUID
if len(bp.Containers) > 0 {
job := worker.ContainerResolveJob{
Arch: ir.arch.Name(),
Specs: make([]worker.ContainerSpec, len(bp.Containers)),
}
for i, c := range bp.Containers {
job.Specs[i] = worker.ContainerSpec{
Source: c.Source,
Name: c.Name,
TLSVerify: c.TLSVerify,
}
}
jobId, err := s.workers.EnqueueContainerResolveJob(&job, channel)
if err != nil {
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err)
}
containerResolveJob = jobId
dependencies = append(dependencies, jobId)
}
var ostreeResolveJobID uuid.UUID
if ir.ostree != nil {
jobID, err := s.workers.EnqueueOSTreeResolveJob(&worker.OSTreeResolveJob{
Specs: []worker.OSTreeResolveSpec{
{
URL: ir.ostree.URL,
Ref: ir.ostree.Ref,
Parent: ir.ostree.Parent,
RHSM: ir.ostree.RHSM,
},
},
}, channel)
if err != nil {
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err)
}
ostreeResolveJobID = jobID
dependencies = append(dependencies, ostreeResolveJobID)
}
manifestJobID, err := s.workers.EnqueueManifestJobByID(&worker.ManifestJobByID{}, dependencies, channel)
if err != nil {
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err)
}
id, err = s.workers.EnqueueOSBuildAsDependency(ir.arch.Name(), &worker.OSBuildJob{
Targets: []*target.Target{ir.target},
PipelineNames: &worker.PipelineNames{
Build: ir.imageType.BuildPipelines(),
Payload: ir.imageType.PayloadPipelines(),
},
}, []uuid.UUID{manifestJobID}, channel)
if err != nil {
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err)
}
s.goroutinesGroup.Add(1)
go func() {
generateManifest(s.goroutinesCtx, s.workers, depsolveJobID, containerResolveJob, ostreeResolveJobID, manifestJobID, ir.imageType, ir.repositories, ir.imageOptions, manifestSeed, &bp)
defer s.goroutinesGroup.Done()
}()
return id, nil
}
func (s *Server) enqueueKojiCompose(taskID uint64, server, name, version, release string, distribution distro.Distro, bp blueprint.Blueprint, manifestSeed int64, irs []imageRequest, channel string) (uuid.UUID, error) {
var id uuid.UUID
kojiDirectory := "osbuild-cg/osbuild-composer-koji-" + uuid.New().String()
initID, err := s.workers.EnqueueKojiInit(&worker.KojiInitJob{
Server: server,
Name: name,
Version: version,
Release: release,
}, channel)
if err != nil {
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err)
}
var kojiFilenames []string
var buildIDs []uuid.UUID
for _, ir := range irs {
// NOTE(akoutsou): Image options don't have resolved ostree ref yet, but it
// will affect package sets if we don't add it and it's required. This
// used to be done in the old PackageSets() function (which no longer
// exists), but now we only need it in the cloud API where things aren't
// done in the (new) correct order yet.
ir.imageOptions.OSTree = &ostree.ImageOptions{
ImageRef: ir.imageType.OSTreeRef(),
}
manifestSource, _, err := ir.imageType.Manifest(&bp, ir.imageOptions, ir.repositories, manifestSeed)
if err != nil {
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err)
}
depsolveJobID, err := s.workers.EnqueueDepsolve(&worker.DepsolveJob{
PackageSets: manifestSource.Content.PackageSets,
ModulePlatformID: distribution.ModulePlatformID(),
Arch: ir.arch.Name(),
Releasever: distribution.Releasever(),
}, channel)
if err != nil {
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err)
}
var containerResolveJob uuid.UUID
dependencies := []uuid.UUID{depsolveJobID}
if len(bp.Containers) > 0 {
job := worker.ContainerResolveJob{
Arch: ir.arch.Name(),
Specs: make([]worker.ContainerSpec, len(bp.Containers)),
}
for i, c := range bp.Containers {
job.Specs[i] = worker.ContainerSpec{
Source: c.Source,
Name: c.Name,
TLSVerify: c.TLSVerify,
}
}
jobId, err := s.workers.EnqueueContainerResolveJob(&job, channel)
if err != nil {
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err)
}
containerResolveJob = jobId
dependencies = append(dependencies, jobId)
}
var ostreeResolveJobID uuid.UUID
if ir.ostree != nil {
jobID, err := s.workers.EnqueueOSTreeResolveJob(&worker.OSTreeResolveJob{
Specs: []worker.OSTreeResolveSpec{
{
URL: ir.ostree.URL,
Ref: ir.ostree.Ref,
Parent: ir.ostree.Parent,
},
},
}, channel)
if err != nil {
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err)
}
ostreeResolveJobID = jobID
dependencies = append(dependencies, ostreeResolveJobID)
}
manifestJobID, err := s.workers.EnqueueManifestJobByID(&worker.ManifestJobByID{}, dependencies, channel)
if err != nil {
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err)
}
kojiFilename := fmt.Sprintf(
"%s-%s-%s.%s%s",
name,
version,
release,
ir.arch.Name(),
splitExtension(ir.imageType.Filename()),
)
kojiTarget := target.NewKojiTarget(&target.KojiTargetOptions{
Server: server,
UploadDirectory: kojiDirectory,
})
kojiTarget.OsbuildArtifact.ExportFilename = ir.imageType.Filename()
kojiTarget.OsbuildArtifact.ExportName = ir.imageType.Exports()[0]
kojiTarget.ImageName = kojiFilename
targets := []*target.Target{kojiTarget}
// add any cloud upload target if defined
if ir.target != nil {
targets = append(targets, ir.target)
}
buildID, err := s.workers.EnqueueOSBuildAsDependency(ir.arch.Name(), &worker.OSBuildJob{
PipelineNames: &worker.PipelineNames{
Build: ir.imageType.BuildPipelines(),
Payload: ir.imageType.PayloadPipelines(),
},
Targets: targets,
ManifestDynArgsIdx: common.ToPtr(1),
}, []uuid.UUID{initID, manifestJobID}, channel)
if err != nil {
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err)
}
kojiFilenames = append(kojiFilenames, kojiFilename)
buildIDs = append(buildIDs, buildID)
// copy the image request while passing it into the goroutine to prevent data races
s.goroutinesGroup.Add(1)
go func(ir imageRequest) {
generateManifest(s.goroutinesCtx, s.workers, depsolveJobID, containerResolveJob, ostreeResolveJobID, manifestJobID, ir.imageType, ir.repositories, ir.imageOptions, manifestSeed, &bp)
defer s.goroutinesGroup.Done()
}(ir)
}
id, err = s.workers.EnqueueKojiFinalize(&worker.KojiFinalizeJob{
Server: server,
Name: name,
Version: version,
Release: release,
KojiFilenames: kojiFilenames,
KojiDirectory: kojiDirectory,
TaskID: taskID,
StartTime: uint64(time.Now().Unix()),
}, initID, buildIDs, channel)
if err != nil {
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err)
}
return id, nil
}
func generateManifest(ctx context.Context, workers *worker.Server, depsolveJobID, containerResolveJobID, ostreeResolveJobID, manifestJobID uuid.UUID, imageType distro.ImageType, repos []rpmmd.RepoConfig, options distro.ImageOptions, seed int64, b *blueprint.Blueprint) {
ctx, cancel := context.WithTimeout(ctx, time.Minute*5)
defer cancel()
// wait until job is in a pending state
var token uuid.UUID
var dynArgs []json.RawMessage
var err error
logWithId := logrus.WithField("jobId", manifestJobID)
for {
_, token, _, _, dynArgs, err = workers.RequestJobById(ctx, "", manifestJobID)
if err == jobqueue.ErrNotPending {
logWithId.Debug("Manifest job not pending, waiting for depsolve job to finish")
time.Sleep(time.Millisecond * 50)
select {
case <-ctx.Done():
logWithId.Warning("Manifest job dependencies took longer than 5 minutes to finish, or the server is shutting down, returning to avoid dangling routines")
break
default:
continue
}
}
if err != nil {
logWithId.Errorf("Error requesting manifest job: %v", err)
return
}
break
}
jobResult := &worker.ManifestJobByIDResult{
Manifest: nil,
}
defer func() {
if jobResult.JobError != nil {
logWithId.Errorf("Error in manifest job %v: %v", jobResult.JobError.Reason, err)
}
result, err := json.Marshal(jobResult)
if err != nil {
logWithId.Errorf("Error marshalling manifest job results: %v", err)
}
err = workers.FinishJob(token, result)
if err != nil {
logWithId.Errorf("Error finishing manifest job: %v", err)
}
}()
if len(dynArgs) == 0 {
reason := "No dynamic arguments"
jobResult.JobError = clienterrors.WorkerClientError(clienterrors.ErrorNoDynamicArgs, reason, nil)
return
}
var depsolveResults worker.DepsolveJobResult
err = json.Unmarshal(dynArgs[0], &depsolveResults)
if err != nil {
reason := "Error parsing dynamic arguments"
jobResult.JobError = clienterrors.WorkerClientError(clienterrors.ErrorParsingDynamicArgs, reason, nil)
return
}
_, err = workers.DepsolveJobInfo(depsolveJobID, &depsolveResults)
if err != nil {
reason := "Error reading depsolve status"
jobResult.JobError = clienterrors.WorkerClientError(clienterrors.ErrorReadingJobStatus, reason, nil)
return
}
if jobErr := depsolveResults.JobError; jobErr != nil {
if jobErr.ID == clienterrors.ErrorDNFDepsolveError || jobErr.ID == clienterrors.ErrorDNFMarkingErrors {
jobResult.JobError = clienterrors.WorkerClientError(clienterrors.ErrorDepsolveDependency, "Error in depsolve job dependency input, bad package set requested", nil)
return
}
jobResult.JobError = clienterrors.WorkerClientError(clienterrors.ErrorDepsolveDependency, "Error in depsolve job dependency", nil)
return
}
if len(depsolveResults.PackageSpecs) == 0 {
jobResult.JobError = clienterrors.WorkerClientError(clienterrors.ErrorEmptyPackageSpecs, "Received empty package specs", nil)
return
}
var containerSpecs []container.Spec
if containerResolveJobID != uuid.Nil {
// Container resolve job
var result worker.ContainerResolveJobResult
_, err := workers.ContainerResolveJobInfo(containerResolveJobID, &result)
if err != nil {
reason := "Error reading container resolve job status"
jobResult.JobError = clienterrors.WorkerClientError(clienterrors.ErrorReadingJobStatus, reason, nil)
return
}
if jobErr := result.JobError; jobErr != nil {
jobResult.JobError = clienterrors.WorkerClientError(clienterrors.ErrorContainerDependency, "Error in container resolve job dependency", nil)
return
}
containerSpecs = make([]container.Spec, len(result.Specs))
for i, s := range result.Specs {
containerSpecs[i].Source = s.Source
containerSpecs[i].Digest = s.Digest
containerSpecs[i].LocalName = s.Name
containerSpecs[i].TLSVerify = s.TLSVerify
containerSpecs[i].ImageID = s.ImageID
containerSpecs[i].ListDigest = s.ListDigest
}
}
if ostreeResolveJobID != uuid.Nil {
var result worker.OSTreeResolveJobResult
_, err := workers.OSTreeResolveJobInfo(ostreeResolveJobID, &result)
if err != nil {
reason := "Error reading ostree resolve job status"
logrus.Errorf("%s: %v", reason, err)
jobResult.JobError = clienterrors.WorkerClientError(clienterrors.ErrorReadingJobStatus, reason, nil)
return
}
if jobErr := result.JobError; jobErr != nil {
jobResult.JobError = clienterrors.WorkerClientError(clienterrors.ErrorOSTreeDependency, "Error in ostree resolve job dependency", nil)
return
}
options.OSTree = &ostree.ImageOptions{
ImageRef: result.Specs[0].Ref,
ParentRef: result.Specs[0].Checksum,
URL: result.Specs[0].URL,
}
}
manifest, _, err := imageType.Manifest(b, options, repos, seed)
if err != nil {
reason := "Error generating manifest"
jobResult.JobError = clienterrors.WorkerClientError(clienterrors.ErrorManifestGeneration, reason, nil)
return
}
// NOTE: This assumes that containers are only embedded in the first
// payload pipeline, which is currently true for all image types but might
// not necessarily be in the future. This is a workaround required for this
// temporary state where the cloud API is not using the new manifest
// generation procedure. Once it's updated, the container specs will be
// mapped to pipeline names properly by the image type itself.
var payloadPipelineName string
if pipelineNames := imageType.PayloadPipelines(); len(pipelineNames) > 0 {
payloadPipelineName = pipelineNames[0]
} else {
panic(fmt.Sprintf("ImageType %q does not define payload pipelines - this is a programming error", imageType.Name()))
}
// TODO: resolve ostree source spec from manifest content and pass here.
ms, err := manifest.Serialize(depsolveResults.PackageSpecs, map[string][]container.Spec{payloadPipelineName: containerSpecs}, nil)
jobResult.Manifest = ms
}