debian-forge-composer/internal/cloudapi/v2/server.go
Achilleas Koutsou 1723e588ce cloudapi: set the default ostree ref for package selection
Before, this was done in the PackageSets() function.
The reason for this is that having an ostree ref affects package
selection (for example, it adds rpm-ostree).  At the package selection
phase, it doesn't matter what the ostree ref is; it is just used to
determine if a pipeline is for an ostree-based image type and it doesn't
affect non-ostree-based image types because the image functions ignore
it.

This is only needed in the cloudapi now because other places have
switched to using the new order of operations, where the manifest is
generated after the ostree commit is resolved, so it's always added when
needed.
2023-05-31 16:40:07 +02:00

513 lines
16 KiB
Go

package v2
import (
"context"
"encoding/json"
"fmt"
"net/http"
"sync"
"time"
"github.com/getkin/kin-openapi/openapi3"
"github.com/getkin/kin-openapi/routers"
legacyrouter "github.com/getkin/kin-openapi/routers/legacy"
"github.com/google/uuid"
"github.com/labstack/echo/v4"
"github.com/labstack/echo/v4/middleware"
"github.com/sirupsen/logrus"
"github.com/osbuild/osbuild-composer/pkg/jobqueue"
"github.com/osbuild/osbuild-composer/internal/blueprint"
"github.com/osbuild/osbuild-composer/internal/common"
"github.com/osbuild/osbuild-composer/internal/container"
"github.com/osbuild/osbuild-composer/internal/distro"
"github.com/osbuild/osbuild-composer/internal/distroregistry"
"github.com/osbuild/osbuild-composer/internal/ostree"
"github.com/osbuild/osbuild-composer/internal/prometheus"
"github.com/osbuild/osbuild-composer/internal/rpmmd"
"github.com/osbuild/osbuild-composer/internal/target"
"github.com/osbuild/osbuild-composer/internal/worker"
"github.com/osbuild/osbuild-composer/internal/worker/clienterrors"
)
// Server represents the state of the cloud Server
type Server struct {
workers *worker.Server
distros *distroregistry.Registry
config ServerConfig
router routers.Router
goroutinesCtx context.Context
goroutinesCtxCancel context.CancelFunc
goroutinesGroup sync.WaitGroup
}
type ServerConfig struct {
TenantProviderFields []string
JWTEnabled bool
}
func NewServer(workers *worker.Server, distros *distroregistry.Registry, config ServerConfig) *Server {
ctx, cancel := context.WithCancel(context.Background())
spec, err := GetSwagger()
if err != nil {
panic(err)
}
loader := openapi3.NewLoader()
if err := spec.Validate(loader.Context); err != nil {
panic(err)
}
router, err := legacyrouter.NewRouter(spec)
if err != nil {
panic(err)
}
server := &Server{
workers: workers,
distros: distros,
config: config,
router: router,
goroutinesCtx: ctx,
goroutinesCtxCancel: cancel,
}
return server
}
func (s *Server) Handler(path string) http.Handler {
e := echo.New()
e.Binder = binder{}
e.HTTPErrorHandler = s.HTTPErrorHandler
e.Pre(common.OperationIDMiddleware)
e.Use(middleware.Recover())
e.Logger = common.Logger()
handler := apiHandlers{
server: s,
}
statusMW := prometheus.StatusMiddleware(prometheus.ComposerSubsystem)
RegisterHandlers(e.Group(path, prometheus.MetricsMiddleware, s.ValidateRequest, statusMW), &handler)
return e
}
func (s *Server) Shutdown() {
s.goroutinesCtxCancel()
s.goroutinesGroup.Wait()
}
func (s *Server) enqueueCompose(distribution distro.Distro, bp blueprint.Blueprint, manifestSeed int64, irs []imageRequest, channel string) (uuid.UUID, error) {
var id uuid.UUID
if len(irs) != 1 {
return id, HTTPError(ErrorInvalidNumberOfImageBuilds)
}
ir := irs[0]
// NOTE(akoutsou): Image options don't have resolved ostree ref yet, but it
// will affect package sets if we don't add it and it's required. This
// used to be done in the old PackageSets() function (which no longer
// exists), but now we only need it in the cloud API where things aren't
// done in the (new) correct order yet.
ir.imageOptions.OSTree = &ostree.ImageOptions{
ImageRef: ir.imageType.OSTreeRef(),
}
manifestSource, _, err := ir.imageType.Manifest(&bp, ir.imageOptions, ir.repositories, manifestSeed)
if err != nil {
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err)
}
depsolveJobID, err := s.workers.EnqueueDepsolve(&worker.DepsolveJob{
PackageSets: manifestSource.Content.PackageSets,
ModulePlatformID: distribution.ModulePlatformID(),
Arch: ir.arch.Name(),
Releasever: distribution.Releasever(),
}, channel)
if err != nil {
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err)
}
dependencies := []uuid.UUID{depsolveJobID}
var containerResolveJob uuid.UUID
if len(bp.Containers) > 0 {
job := worker.ContainerResolveJob{
Arch: ir.arch.Name(),
Specs: make([]worker.ContainerSpec, len(bp.Containers)),
}
for i, c := range bp.Containers {
job.Specs[i] = worker.ContainerSpec{
Source: c.Source,
Name: c.Name,
TLSVerify: c.TLSVerify,
}
}
jobId, err := s.workers.EnqueueContainerResolveJob(&job, channel)
if err != nil {
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err)
}
containerResolveJob = jobId
dependencies = append(dependencies, jobId)
}
var ostreeResolveJobID uuid.UUID
if ir.ostree != nil {
jobID, err := s.workers.EnqueueOSTreeResolveJob(&worker.OSTreeResolveJob{
Specs: []worker.OSTreeResolveSpec{
{
URL: ir.ostree.URL,
Ref: ir.ostree.Ref,
Parent: ir.ostree.Parent,
RHSM: ir.ostree.RHSM,
},
},
}, channel)
if err != nil {
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err)
}
ostreeResolveJobID = jobID
dependencies = append(dependencies, ostreeResolveJobID)
}
manifestJobID, err := s.workers.EnqueueManifestJobByID(&worker.ManifestJobByID{}, dependencies, channel)
if err != nil {
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err)
}
id, err = s.workers.EnqueueOSBuildAsDependency(ir.arch.Name(), &worker.OSBuildJob{
Targets: []*target.Target{ir.target},
PipelineNames: &worker.PipelineNames{
Build: ir.imageType.BuildPipelines(),
Payload: ir.imageType.PayloadPipelines(),
},
}, []uuid.UUID{manifestJobID}, channel)
if err != nil {
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err)
}
s.goroutinesGroup.Add(1)
go func() {
generateManifest(s.goroutinesCtx, s.workers, depsolveJobID, containerResolveJob, ostreeResolveJobID, manifestJobID, ir.imageType, ir.repositories, ir.imageOptions, manifestSeed, &bp)
defer s.goroutinesGroup.Done()
}()
return id, nil
}
func (s *Server) enqueueKojiCompose(taskID uint64, server, name, version, release string, distribution distro.Distro, bp blueprint.Blueprint, manifestSeed int64, irs []imageRequest, channel string) (uuid.UUID, error) {
var id uuid.UUID
kojiDirectory := "osbuild-cg/osbuild-composer-koji-" + uuid.New().String()
initID, err := s.workers.EnqueueKojiInit(&worker.KojiInitJob{
Server: server,
Name: name,
Version: version,
Release: release,
}, channel)
if err != nil {
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err)
}
var kojiFilenames []string
var buildIDs []uuid.UUID
for _, ir := range irs {
// NOTE(akoutsou): Image options don't have resolved ostree ref yet, but it
// will affect package sets if we don't add it and it's required. This
// used to be done in the old PackageSets() function (which no longer
// exists), but now we only need it in the cloud API where things aren't
// done in the (new) correct order yet.
ir.imageOptions.OSTree = &ostree.ImageOptions{
ImageRef: ir.imageType.OSTreeRef(),
}
manifestSource, _, err := ir.imageType.Manifest(&bp, ir.imageOptions, ir.repositories, manifestSeed)
if err != nil {
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err)
}
depsolveJobID, err := s.workers.EnqueueDepsolve(&worker.DepsolveJob{
PackageSets: manifestSource.Content.PackageSets,
ModulePlatformID: distribution.ModulePlatformID(),
Arch: ir.arch.Name(),
Releasever: distribution.Releasever(),
}, channel)
if err != nil {
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err)
}
var containerResolveJob uuid.UUID
dependencies := []uuid.UUID{depsolveJobID}
if len(bp.Containers) > 0 {
job := worker.ContainerResolveJob{
Arch: ir.arch.Name(),
Specs: make([]worker.ContainerSpec, len(bp.Containers)),
}
for i, c := range bp.Containers {
job.Specs[i] = worker.ContainerSpec{
Source: c.Source,
Name: c.Name,
TLSVerify: c.TLSVerify,
}
}
jobId, err := s.workers.EnqueueContainerResolveJob(&job, channel)
if err != nil {
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err)
}
containerResolveJob = jobId
dependencies = append(dependencies, jobId)
}
var ostreeResolveJobID uuid.UUID
if ir.ostree != nil {
jobID, err := s.workers.EnqueueOSTreeResolveJob(&worker.OSTreeResolveJob{
Specs: []worker.OSTreeResolveSpec{
{
URL: ir.ostree.URL,
Ref: ir.ostree.Ref,
Parent: ir.ostree.Parent,
},
},
}, channel)
if err != nil {
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err)
}
ostreeResolveJobID = jobID
dependencies = append(dependencies, ostreeResolveJobID)
}
manifestJobID, err := s.workers.EnqueueManifestJobByID(&worker.ManifestJobByID{}, dependencies, channel)
if err != nil {
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err)
}
kojiFilename := fmt.Sprintf(
"%s-%s-%s.%s%s",
name,
version,
release,
ir.arch.Name(),
splitExtension(ir.imageType.Filename()),
)
kojiTarget := target.NewKojiTarget(&target.KojiTargetOptions{
Server: server,
UploadDirectory: kojiDirectory,
})
kojiTarget.OsbuildArtifact.ExportFilename = ir.imageType.Filename()
kojiTarget.OsbuildArtifact.ExportName = ir.imageType.Exports()[0]
kojiTarget.ImageName = kojiFilename
targets := []*target.Target{kojiTarget}
// add any cloud upload target if defined
if ir.target != nil {
targets = append(targets, ir.target)
}
buildID, err := s.workers.EnqueueOSBuildAsDependency(ir.arch.Name(), &worker.OSBuildJob{
PipelineNames: &worker.PipelineNames{
Build: ir.imageType.BuildPipelines(),
Payload: ir.imageType.PayloadPipelines(),
},
Targets: targets,
ManifestDynArgsIdx: common.ToPtr(1),
}, []uuid.UUID{initID, manifestJobID}, channel)
if err != nil {
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err)
}
kojiFilenames = append(kojiFilenames, kojiFilename)
buildIDs = append(buildIDs, buildID)
// copy the image request while passing it into the goroutine to prevent data races
s.goroutinesGroup.Add(1)
go func(ir imageRequest) {
generateManifest(s.goroutinesCtx, s.workers, depsolveJobID, containerResolveJob, ostreeResolveJobID, manifestJobID, ir.imageType, ir.repositories, ir.imageOptions, manifestSeed, &bp)
defer s.goroutinesGroup.Done()
}(ir)
}
id, err = s.workers.EnqueueKojiFinalize(&worker.KojiFinalizeJob{
Server: server,
Name: name,
Version: version,
Release: release,
KojiFilenames: kojiFilenames,
KojiDirectory: kojiDirectory,
TaskID: taskID,
StartTime: uint64(time.Now().Unix()),
}, initID, buildIDs, channel)
if err != nil {
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err)
}
return id, nil
}
func generateManifest(ctx context.Context, workers *worker.Server, depsolveJobID, containerResolveJobID, ostreeResolveJobID, manifestJobID uuid.UUID, imageType distro.ImageType, repos []rpmmd.RepoConfig, options distro.ImageOptions, seed int64, b *blueprint.Blueprint) {
ctx, cancel := context.WithTimeout(ctx, time.Minute*5)
defer cancel()
// wait until job is in a pending state
var token uuid.UUID
var dynArgs []json.RawMessage
var err error
logWithId := logrus.WithField("jobId", manifestJobID)
for {
_, token, _, _, dynArgs, err = workers.RequestJobById(ctx, "", manifestJobID)
if err == jobqueue.ErrNotPending {
logWithId.Debug("Manifest job not pending, waiting for depsolve job to finish")
time.Sleep(time.Millisecond * 50)
select {
case <-ctx.Done():
logWithId.Warning("Manifest job dependencies took longer than 5 minutes to finish, or the server is shutting down, returning to avoid dangling routines")
break
default:
continue
}
}
if err != nil {
logWithId.Errorf("Error requesting manifest job: %v", err)
return
}
break
}
jobResult := &worker.ManifestJobByIDResult{
Manifest: nil,
}
defer func() {
if jobResult.JobError != nil {
logWithId.Errorf("Error in manifest job %v: %v", jobResult.JobError.Reason, err)
}
result, err := json.Marshal(jobResult)
if err != nil {
logWithId.Errorf("Error marshalling manifest job results: %v", err)
}
err = workers.FinishJob(token, result)
if err != nil {
logWithId.Errorf("Error finishing manifest job: %v", err)
}
}()
if len(dynArgs) == 0 {
reason := "No dynamic arguments"
jobResult.JobError = clienterrors.WorkerClientError(clienterrors.ErrorNoDynamicArgs, reason, nil)
return
}
var depsolveResults worker.DepsolveJobResult
err = json.Unmarshal(dynArgs[0], &depsolveResults)
if err != nil {
reason := "Error parsing dynamic arguments"
jobResult.JobError = clienterrors.WorkerClientError(clienterrors.ErrorParsingDynamicArgs, reason, nil)
return
}
_, err = workers.DepsolveJobInfo(depsolveJobID, &depsolveResults)
if err != nil {
reason := "Error reading depsolve status"
jobResult.JobError = clienterrors.WorkerClientError(clienterrors.ErrorReadingJobStatus, reason, nil)
return
}
if jobErr := depsolveResults.JobError; jobErr != nil {
if jobErr.ID == clienterrors.ErrorDNFDepsolveError || jobErr.ID == clienterrors.ErrorDNFMarkingErrors {
jobResult.JobError = clienterrors.WorkerClientError(clienterrors.ErrorDepsolveDependency, "Error in depsolve job dependency input, bad package set requested", nil)
return
}
jobResult.JobError = clienterrors.WorkerClientError(clienterrors.ErrorDepsolveDependency, "Error in depsolve job dependency", nil)
return
}
if len(depsolveResults.PackageSpecs) == 0 {
jobResult.JobError = clienterrors.WorkerClientError(clienterrors.ErrorEmptyPackageSpecs, "Received empty package specs", nil)
return
}
var containerSpecs []container.Spec
if containerResolveJobID != uuid.Nil {
// Container resolve job
var result worker.ContainerResolveJobResult
_, err := workers.ContainerResolveJobInfo(containerResolveJobID, &result)
if err != nil {
reason := "Error reading container resolve job status"
jobResult.JobError = clienterrors.WorkerClientError(clienterrors.ErrorReadingJobStatus, reason, nil)
return
}
if jobErr := result.JobError; jobErr != nil {
jobResult.JobError = clienterrors.WorkerClientError(clienterrors.ErrorContainerDependency, "Error in container resolve job dependency", nil)
return
}
containerSpecs = make([]container.Spec, len(result.Specs))
for i, s := range result.Specs {
containerSpecs[i].Source = s.Source
containerSpecs[i].Digest = s.Digest
containerSpecs[i].LocalName = s.Name
containerSpecs[i].TLSVerify = s.TLSVerify
containerSpecs[i].ImageID = s.ImageID
containerSpecs[i].ListDigest = s.ListDigest
}
}
if ostreeResolveJobID != uuid.Nil {
var result worker.OSTreeResolveJobResult
_, err := workers.OSTreeResolveJobInfo(ostreeResolveJobID, &result)
if err != nil {
reason := "Error reading ostree resolve job status"
logrus.Errorf("%s: %v", reason, err)
jobResult.JobError = clienterrors.WorkerClientError(clienterrors.ErrorReadingJobStatus, reason, nil)
return
}
if jobErr := result.JobError; jobErr != nil {
jobResult.JobError = clienterrors.WorkerClientError(clienterrors.ErrorOSTreeDependency, "Error in ostree resolve job dependency", nil)
return
}
options.OSTree = &ostree.ImageOptions{
ImageRef: result.Specs[0].Ref,
FetchChecksum: result.Specs[0].Checksum,
URL: result.Specs[0].URL,
}
}
manifest, _, err := imageType.Manifest(b, options, repos, seed)
if err != nil {
reason := "Error generating manifest"
jobResult.JobError = clienterrors.WorkerClientError(clienterrors.ErrorManifestGeneration, reason, nil)
return
}
// NOTE: This assumes that containers are only embedded in the first
// payload pipeline, which is currently true for all image types but might
// not necessarily be in the future. This is a workaround required for this
// temporary state where the cloud API is not using the new manifest
// generation procedure. Once it's updated, the container specs will be
// mapped to pipeline names properly by the image type itself.
var payloadPipelineName string
if pipelineNames := imageType.PayloadPipelines(); len(pipelineNames) > 0 {
payloadPipelineName = pipelineNames[0]
} else {
panic(fmt.Sprintf("ImageType %q does not define payload pipelines - this is a programming error", imageType.Name()))
}
ms, err := manifest.Serialize(depsolveResults.PackageSpecs, map[string][]container.Spec{payloadPipelineName: containerSpecs})
jobResult.Manifest = ms
}