debian-forge-composer/internal/cloudapi/v2/server.go
Achilleas Koutsou 3b1d48ec99 distro: remove packageSpecSets and containers from Manifest() args
The arguments aren't used in the function anymore.
2023-05-31 16:40:07 +02:00

474 lines
14 KiB
Go

package v2
import (
"context"
"encoding/json"
"fmt"
"net/http"
"sync"
"time"
"github.com/getkin/kin-openapi/openapi3"
"github.com/getkin/kin-openapi/routers"
legacyrouter "github.com/getkin/kin-openapi/routers/legacy"
"github.com/google/uuid"
"github.com/labstack/echo/v4"
"github.com/labstack/echo/v4/middleware"
"github.com/sirupsen/logrus"
"github.com/osbuild/osbuild-composer/pkg/jobqueue"
"github.com/osbuild/osbuild-composer/internal/blueprint"
"github.com/osbuild/osbuild-composer/internal/common"
"github.com/osbuild/osbuild-composer/internal/container"
"github.com/osbuild/osbuild-composer/internal/distro"
"github.com/osbuild/osbuild-composer/internal/distroregistry"
"github.com/osbuild/osbuild-composer/internal/ostree"
"github.com/osbuild/osbuild-composer/internal/prometheus"
"github.com/osbuild/osbuild-composer/internal/rpmmd"
"github.com/osbuild/osbuild-composer/internal/target"
"github.com/osbuild/osbuild-composer/internal/worker"
"github.com/osbuild/osbuild-composer/internal/worker/clienterrors"
)
// Server represents the state of the cloud Server
type Server struct {
workers *worker.Server
distros *distroregistry.Registry
config ServerConfig
router routers.Router
goroutinesCtx context.Context
goroutinesCtxCancel context.CancelFunc
goroutinesGroup sync.WaitGroup
}
type ServerConfig struct {
TenantProviderFields []string
JWTEnabled bool
}
func NewServer(workers *worker.Server, distros *distroregistry.Registry, config ServerConfig) *Server {
ctx, cancel := context.WithCancel(context.Background())
spec, err := GetSwagger()
if err != nil {
panic(err)
}
loader := openapi3.NewLoader()
if err := spec.Validate(loader.Context); err != nil {
panic(err)
}
router, err := legacyrouter.NewRouter(spec)
if err != nil {
panic(err)
}
server := &Server{
workers: workers,
distros: distros,
config: config,
router: router,
goroutinesCtx: ctx,
goroutinesCtxCancel: cancel,
}
return server
}
func (s *Server) Handler(path string) http.Handler {
e := echo.New()
e.Binder = binder{}
e.HTTPErrorHandler = s.HTTPErrorHandler
e.Pre(common.OperationIDMiddleware)
e.Use(middleware.Recover())
e.Logger = common.Logger()
handler := apiHandlers{
server: s,
}
statusMW := prometheus.StatusMiddleware(prometheus.ComposerSubsystem)
RegisterHandlers(e.Group(path, prometheus.MetricsMiddleware, s.ValidateRequest, statusMW), &handler)
return e
}
func (s *Server) Shutdown() {
s.goroutinesCtxCancel()
s.goroutinesGroup.Wait()
}
func (s *Server) enqueueCompose(distribution distro.Distro, bp blueprint.Blueprint, manifestSeed int64, irs []imageRequest, channel string) (uuid.UUID, error) {
var id uuid.UUID
if len(irs) != 1 {
return id, HTTPError(ErrorInvalidNumberOfImageBuilds)
}
ir := irs[0]
depsolveJobID, err := s.workers.EnqueueDepsolve(&worker.DepsolveJob{
PackageSets: ir.imageType.PackageSets(bp, ir.imageOptions, ir.repositories),
ModulePlatformID: distribution.ModulePlatformID(),
Arch: ir.arch.Name(),
Releasever: distribution.Releasever(),
}, channel)
if err != nil {
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err)
}
dependencies := []uuid.UUID{depsolveJobID}
var containerResolveJob uuid.UUID
if len(bp.Containers) > 0 {
job := worker.ContainerResolveJob{
Arch: ir.arch.Name(),
Specs: make([]worker.ContainerSpec, len(bp.Containers)),
}
for i, c := range bp.Containers {
job.Specs[i] = worker.ContainerSpec{
Source: c.Source,
Name: c.Name,
TLSVerify: c.TLSVerify,
}
}
jobId, err := s.workers.EnqueueContainerResolveJob(&job, channel)
if err != nil {
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err)
}
containerResolveJob = jobId
dependencies = append(dependencies, jobId)
}
var ostreeResolveJobID uuid.UUID
if ir.ostree != nil {
jobID, err := s.workers.EnqueueOSTreeResolveJob(&worker.OSTreeResolveJob{
Specs: []worker.OSTreeResolveSpec{
worker.OSTreeResolveSpec{
URL: ir.ostree.URL,
Ref: ir.ostree.Ref,
Parent: ir.ostree.Parent,
RHSM: ir.ostree.RHSM,
},
},
}, channel)
if err != nil {
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err)
}
ostreeResolveJobID = jobID
dependencies = append(dependencies, ostreeResolveJobID)
}
manifestJobID, err := s.workers.EnqueueManifestJobByID(&worker.ManifestJobByID{}, dependencies, channel)
if err != nil {
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err)
}
id, err = s.workers.EnqueueOSBuildAsDependency(ir.arch.Name(), &worker.OSBuildJob{
Targets: []*target.Target{ir.target},
PipelineNames: &worker.PipelineNames{
Build: ir.imageType.BuildPipelines(),
Payload: ir.imageType.PayloadPipelines(),
},
}, []uuid.UUID{manifestJobID}, channel)
if err != nil {
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err)
}
s.goroutinesGroup.Add(1)
go func() {
generateManifest(s.goroutinesCtx, s.workers, depsolveJobID, containerResolveJob, ostreeResolveJobID, manifestJobID, ir.imageType, ir.repositories, ir.imageOptions, manifestSeed, &bp)
defer s.goroutinesGroup.Done()
}()
return id, nil
}
func (s *Server) enqueueKojiCompose(taskID uint64, server, name, version, release string, distribution distro.Distro, bp blueprint.Blueprint, manifestSeed int64, irs []imageRequest, channel string) (uuid.UUID, error) {
var id uuid.UUID
kojiDirectory := "osbuild-cg/osbuild-composer-koji-" + uuid.New().String()
initID, err := s.workers.EnqueueKojiInit(&worker.KojiInitJob{
Server: server,
Name: name,
Version: version,
Release: release,
}, channel)
if err != nil {
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err)
}
var kojiFilenames []string
var buildIDs []uuid.UUID
for _, ir := range irs {
depsolveJobID, err := s.workers.EnqueueDepsolve(&worker.DepsolveJob{
PackageSets: ir.imageType.PackageSets(bp, ir.imageOptions, ir.repositories),
ModulePlatformID: distribution.ModulePlatformID(),
Arch: ir.arch.Name(),
Releasever: distribution.Releasever(),
}, channel)
if err != nil {
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err)
}
var containerResolveJob uuid.UUID
dependencies := []uuid.UUID{depsolveJobID}
if len(bp.Containers) > 0 {
job := worker.ContainerResolveJob{
Arch: ir.arch.Name(),
Specs: make([]worker.ContainerSpec, len(bp.Containers)),
}
for i, c := range bp.Containers {
job.Specs[i] = worker.ContainerSpec{
Source: c.Source,
Name: c.Name,
TLSVerify: c.TLSVerify,
}
}
jobId, err := s.workers.EnqueueContainerResolveJob(&job, channel)
if err != nil {
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err)
}
containerResolveJob = jobId
dependencies = append(dependencies, jobId)
}
var ostreeResolveJobID uuid.UUID
if ir.ostree != nil {
jobID, err := s.workers.EnqueueOSTreeResolveJob(&worker.OSTreeResolveJob{
Specs: []worker.OSTreeResolveSpec{
worker.OSTreeResolveSpec{
URL: ir.ostree.URL,
Ref: ir.ostree.Ref,
Parent: ir.ostree.Parent,
},
},
}, channel)
if err != nil {
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err)
}
ostreeResolveJobID = jobID
dependencies = append(dependencies, ostreeResolveJobID)
}
manifestJobID, err := s.workers.EnqueueManifestJobByID(&worker.ManifestJobByID{}, dependencies, channel)
if err != nil {
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err)
}
kojiFilename := fmt.Sprintf(
"%s-%s-%s.%s%s",
name,
version,
release,
ir.arch.Name(),
splitExtension(ir.imageType.Filename()),
)
kojiTarget := target.NewKojiTarget(&target.KojiTargetOptions{
Server: server,
UploadDirectory: kojiDirectory,
})
kojiTarget.OsbuildArtifact.ExportFilename = ir.imageType.Filename()
kojiTarget.OsbuildArtifact.ExportName = ir.imageType.Exports()[0]
kojiTarget.ImageName = kojiFilename
targets := []*target.Target{kojiTarget}
// add any cloud upload target if defined
if ir.target != nil {
targets = append(targets, ir.target)
}
buildID, err := s.workers.EnqueueOSBuildAsDependency(ir.arch.Name(), &worker.OSBuildJob{
PipelineNames: &worker.PipelineNames{
Build: ir.imageType.BuildPipelines(),
Payload: ir.imageType.PayloadPipelines(),
},
Targets: targets,
ManifestDynArgsIdx: common.ToPtr(1),
}, []uuid.UUID{initID, manifestJobID}, channel)
if err != nil {
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err)
}
kojiFilenames = append(kojiFilenames, kojiFilename)
buildIDs = append(buildIDs, buildID)
// copy the image request while passing it into the goroutine to prevent data races
s.goroutinesGroup.Add(1)
go func(ir imageRequest) {
generateManifest(s.goroutinesCtx, s.workers, depsolveJobID, containerResolveJob, ostreeResolveJobID, manifestJobID, ir.imageType, ir.repositories, ir.imageOptions, manifestSeed, &bp)
defer s.goroutinesGroup.Done()
}(ir)
}
id, err = s.workers.EnqueueKojiFinalize(&worker.KojiFinalizeJob{
Server: server,
Name: name,
Version: version,
Release: release,
KojiFilenames: kojiFilenames,
KojiDirectory: kojiDirectory,
TaskID: taskID,
StartTime: uint64(time.Now().Unix()),
}, initID, buildIDs, channel)
if err != nil {
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err)
}
return id, nil
}
func generateManifest(ctx context.Context, workers *worker.Server, depsolveJobID, containerResolveJobID, ostreeResolveJobID, manifestJobID uuid.UUID, imageType distro.ImageType, repos []rpmmd.RepoConfig, options distro.ImageOptions, seed int64, b *blueprint.Blueprint) {
ctx, cancel := context.WithTimeout(ctx, time.Minute*5)
defer cancel()
// wait until job is in a pending state
var token uuid.UUID
var dynArgs []json.RawMessage
var err error
logWithId := logrus.WithField("jobId", manifestJobID)
for {
_, token, _, _, dynArgs, err = workers.RequestJobById(ctx, "", manifestJobID)
if err == jobqueue.ErrNotPending {
logWithId.Debug("Manifest job not pending, waiting for depsolve job to finish")
time.Sleep(time.Millisecond * 50)
select {
case <-ctx.Done():
logWithId.Warning("Manifest job dependencies took longer than 5 minutes to finish, or the server is shutting down, returning to avoid dangling routines")
break
default:
continue
}
}
if err != nil {
logWithId.Errorf("Error requesting manifest job: %v", err)
return
}
break
}
jobResult := &worker.ManifestJobByIDResult{
Manifest: nil,
}
defer func() {
if jobResult.JobError != nil {
logWithId.Errorf("Error in manifest job %v: %v", jobResult.JobError.Reason, err)
}
result, err := json.Marshal(jobResult)
if err != nil {
logWithId.Errorf("Error marshalling manifest job results: %v", err)
}
err = workers.FinishJob(token, result)
if err != nil {
logWithId.Errorf("Error finishing manifest job: %v", err)
}
}()
if len(dynArgs) == 0 {
reason := "No dynamic arguments"
jobResult.JobError = clienterrors.WorkerClientError(clienterrors.ErrorNoDynamicArgs, reason, nil)
return
}
var depsolveResults worker.DepsolveJobResult
err = json.Unmarshal(dynArgs[0], &depsolveResults)
if err != nil {
reason := "Error parsing dynamic arguments"
jobResult.JobError = clienterrors.WorkerClientError(clienterrors.ErrorParsingDynamicArgs, reason, nil)
return
}
_, err = workers.DepsolveJobInfo(depsolveJobID, &depsolveResults)
if err != nil {
reason := "Error reading depsolve status"
jobResult.JobError = clienterrors.WorkerClientError(clienterrors.ErrorReadingJobStatus, reason, nil)
return
}
if jobErr := depsolveResults.JobError; jobErr != nil {
if jobErr.ID == clienterrors.ErrorDNFDepsolveError || jobErr.ID == clienterrors.ErrorDNFMarkingErrors {
jobResult.JobError = clienterrors.WorkerClientError(clienterrors.ErrorDepsolveDependency, "Error in depsolve job dependency input, bad package set requested", nil)
return
}
jobResult.JobError = clienterrors.WorkerClientError(clienterrors.ErrorDepsolveDependency, "Error in depsolve job dependency", nil)
return
}
if len(depsolveResults.PackageSpecs) == 0 {
jobResult.JobError = clienterrors.WorkerClientError(clienterrors.ErrorEmptyPackageSpecs, "Received empty package specs", nil)
return
}
var containerSpecs []container.Spec
if containerResolveJobID != uuid.Nil {
// Container resolve job
var result worker.ContainerResolveJobResult
_, err := workers.ContainerResolveJobInfo(containerResolveJobID, &result)
if err != nil {
reason := "Error reading container resolve job status"
jobResult.JobError = clienterrors.WorkerClientError(clienterrors.ErrorReadingJobStatus, reason, nil)
return
}
if jobErr := result.JobError; jobErr != nil {
jobResult.JobError = clienterrors.WorkerClientError(clienterrors.ErrorContainerDependency, "Error in container resolve job dependency", nil)
return
}
containerSpecs = make([]container.Spec, len(result.Specs))
for i, s := range result.Specs {
containerSpecs[i].Source = s.Source
containerSpecs[i].Digest = s.Digest
containerSpecs[i].LocalName = s.Name
containerSpecs[i].TLSVerify = s.TLSVerify
containerSpecs[i].ImageID = s.ImageID
containerSpecs[i].ListDigest = s.ListDigest
}
}
if ostreeResolveJobID != uuid.Nil {
var result worker.OSTreeResolveJobResult
_, err := workers.OSTreeResolveJobInfo(ostreeResolveJobID, &result)
if err != nil {
reason := "Error reading ostree resolve job status"
logrus.Errorf("%s: %v", reason, err)
jobResult.JobError = clienterrors.WorkerClientError(clienterrors.ErrorReadingJobStatus, reason, nil)
return
}
if jobErr := result.JobError; jobErr != nil {
jobResult.JobError = clienterrors.WorkerClientError(clienterrors.ErrorOSTreeDependency, "Error in ostree resolve job dependency", nil)
return
}
options.OSTree = &ostree.ImageOptions{
ImageRef: result.Specs[0].Ref,
FetchChecksum: result.Specs[0].Checksum,
URL: result.Specs[0].URL,
}
}
manifest, _, err := imageType.Manifest(b, options, repos, seed)
if err != nil {
reason := "Error generating manifest"
jobResult.JobError = clienterrors.WorkerClientError(clienterrors.ErrorManifestGeneration, reason, nil)
return
}
ms, err := manifest.Serialize(depsolveResults.PackageSpecs, nil)
jobResult.Manifest = ms
}