debian-forge-composer/internal/cloudapi/v2/server.go
Achilleas Koutsou 6e4efabf24 cloudapi: use manifest content to resolve containers
Use the container sources provided by the content on the initialised
manifest instead of the blueprint to resolve containers.  The container
sources on the manifest are a map keyed by the name of the pipeline that
will use the resolved containers, but the worker's container resolve job
works on a slice, so we reread the content map to get the pipeline name
(instead of taking the first payload pipeline from the image type).
This requires that there be only one pipeline that embeds containers,
which currently true of all our image types.
2023-06-14 11:19:29 +02:00

550 lines
18 KiB
Go

package v2
import (
"context"
"encoding/json"
"fmt"
"net/http"
"strings"
"sync"
"time"
"github.com/getkin/kin-openapi/openapi3"
"github.com/getkin/kin-openapi/routers"
legacyrouter "github.com/getkin/kin-openapi/routers/legacy"
"github.com/google/uuid"
"github.com/labstack/echo/v4"
"github.com/labstack/echo/v4/middleware"
"github.com/sirupsen/logrus"
"github.com/osbuild/osbuild-composer/pkg/jobqueue"
"github.com/osbuild/osbuild-composer/internal/blueprint"
"github.com/osbuild/osbuild-composer/internal/common"
"github.com/osbuild/osbuild-composer/internal/container"
"github.com/osbuild/osbuild-composer/internal/distro"
"github.com/osbuild/osbuild-composer/internal/distroregistry"
"github.com/osbuild/osbuild-composer/internal/ostree"
"github.com/osbuild/osbuild-composer/internal/prometheus"
"github.com/osbuild/osbuild-composer/internal/rpmmd"
"github.com/osbuild/osbuild-composer/internal/target"
"github.com/osbuild/osbuild-composer/internal/worker"
"github.com/osbuild/osbuild-composer/internal/worker/clienterrors"
)
// Server represents the state of the cloud Server
type Server struct {
workers *worker.Server
distros *distroregistry.Registry
config ServerConfig
router routers.Router
goroutinesCtx context.Context
goroutinesCtxCancel context.CancelFunc
goroutinesGroup sync.WaitGroup
}
type ServerConfig struct {
TenantProviderFields []string
JWTEnabled bool
}
func NewServer(workers *worker.Server, distros *distroregistry.Registry, config ServerConfig) *Server {
ctx, cancel := context.WithCancel(context.Background())
spec, err := GetSwagger()
if err != nil {
panic(err)
}
loader := openapi3.NewLoader()
if err := spec.Validate(loader.Context); err != nil {
panic(err)
}
router, err := legacyrouter.NewRouter(spec)
if err != nil {
panic(err)
}
server := &Server{
workers: workers,
distros: distros,
config: config,
router: router,
goroutinesCtx: ctx,
goroutinesCtxCancel: cancel,
}
return server
}
func (s *Server) Handler(path string) http.Handler {
e := echo.New()
e.Binder = binder{}
e.HTTPErrorHandler = s.HTTPErrorHandler
e.Pre(common.OperationIDMiddleware)
e.Use(middleware.Recover())
e.Logger = common.Logger()
handler := apiHandlers{
server: s,
}
statusMW := prometheus.StatusMiddleware(prometheus.ComposerSubsystem)
RegisterHandlers(e.Group(path, prometheus.MetricsMiddleware, s.ValidateRequest, statusMW), &handler)
return e
}
func (s *Server) Shutdown() {
s.goroutinesCtxCancel()
s.goroutinesGroup.Wait()
}
func (s *Server) enqueueCompose(distribution distro.Distro, bp blueprint.Blueprint, manifestSeed int64, irs []imageRequest, channel string) (uuid.UUID, error) {
var id uuid.UUID
if len(irs) != 1 {
return id, HTTPError(ErrorInvalidNumberOfImageBuilds)
}
ir := irs[0]
manifestSource, _, err := ir.imageType.Manifest(&bp, ir.imageOptions, ir.repositories, manifestSeed)
if err != nil {
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err)
}
depsolveJobID, err := s.workers.EnqueueDepsolve(&worker.DepsolveJob{
PackageSets: manifestSource.Content.PackageSets,
ModulePlatformID: distribution.ModulePlatformID(),
Arch: ir.arch.Name(),
Releasever: distribution.Releasever(),
}, channel)
if err != nil {
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err)
}
dependencies := []uuid.UUID{depsolveJobID}
var containerResolveJobID uuid.UUID
containerSources := manifestSource.Content.Containers
if len(containerSources) > 1 {
// only one pipeline can embed containers
pipelines := make([]string, 0, len(containerSources))
for name := range containerSources {
pipelines = append(pipelines, name)
}
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, fmt.Errorf("manifest returned %d pipelines with containers (at most 1 is supported): %s", len(containerSources), strings.Join(pipelines, ", ")))
}
for _, sources := range containerSources {
workerResolveSpecs := make([]worker.ContainerSpec, len(sources))
for idx, source := range sources {
workerResolveSpecs[idx] = worker.ContainerSpec{
Source: source.Source,
Name: source.Name,
TLSVerify: source.TLSVerify,
}
}
job := worker.ContainerResolveJob{
Arch: ir.arch.Name(),
Specs: workerResolveSpecs,
}
jobId, err := s.workers.EnqueueContainerResolveJob(&job, channel)
if err != nil {
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err)
}
containerResolveJobID = jobId
dependencies = append(dependencies, containerResolveJobID)
break // there can be only one
}
var ostreeResolveJobID uuid.UUID
commitSources := manifestSource.Content.OSTreeCommits
if len(commitSources) > 1 {
// only one pipeline can specify an ostree commit for content
pipelines := make([]string, 0, len(commitSources))
for name := range commitSources {
pipelines = append(pipelines, name)
}
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, fmt.Errorf("manifest returned %d pipelines with ostree commits (at most 1 is supported): %s", len(commitSources), strings.Join(pipelines, ", ")))
}
for _, sources := range commitSources {
workerResolveSpecs := make([]worker.OSTreeResolveSpec, len(sources))
for idx, source := range sources {
// ostree.SourceSpec is directly convertible to worker.OSTreeResolveSpec
workerResolveSpecs[idx] = worker.OSTreeResolveSpec(source)
}
jobID, err := s.workers.EnqueueOSTreeResolveJob(&worker.OSTreeResolveJob{Specs: workerResolveSpecs}, channel)
if err != nil {
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err)
}
ostreeResolveJobID = jobID
dependencies = append(dependencies, ostreeResolveJobID)
break // there can be only one
}
manifestJobID, err := s.workers.EnqueueManifestJobByID(&worker.ManifestJobByID{}, dependencies, channel)
if err != nil {
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err)
}
id, err = s.workers.EnqueueOSBuildAsDependency(ir.arch.Name(), &worker.OSBuildJob{
Targets: []*target.Target{ir.target},
PipelineNames: &worker.PipelineNames{
Build: ir.imageType.BuildPipelines(),
Payload: ir.imageType.PayloadPipelines(),
},
}, []uuid.UUID{manifestJobID}, channel)
if err != nil {
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err)
}
s.goroutinesGroup.Add(1)
go func() {
generateManifest(s.goroutinesCtx, s.workers, depsolveJobID, containerResolveJobID, ostreeResolveJobID, manifestJobID, ir.imageType, ir.repositories, ir.imageOptions, manifestSeed, &bp)
defer s.goroutinesGroup.Done()
}()
return id, nil
}
func (s *Server) enqueueKojiCompose(taskID uint64, server, name, version, release string, distribution distro.Distro, bp blueprint.Blueprint, manifestSeed int64, irs []imageRequest, channel string) (uuid.UUID, error) {
var id uuid.UUID
kojiDirectory := "osbuild-cg/osbuild-composer-koji-" + uuid.New().String()
initID, err := s.workers.EnqueueKojiInit(&worker.KojiInitJob{
Server: server,
Name: name,
Version: version,
Release: release,
}, channel)
if err != nil {
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err)
}
var kojiFilenames []string
var buildIDs []uuid.UUID
for _, ir := range irs {
manifestSource, _, err := ir.imageType.Manifest(&bp, ir.imageOptions, ir.repositories, manifestSeed)
if err != nil {
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err)
}
depsolveJobID, err := s.workers.EnqueueDepsolve(&worker.DepsolveJob{
PackageSets: manifestSource.Content.PackageSets,
ModulePlatformID: distribution.ModulePlatformID(),
Arch: ir.arch.Name(),
Releasever: distribution.Releasever(),
}, channel)
if err != nil {
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err)
}
dependencies := []uuid.UUID{depsolveJobID}
var containerResolveJobID uuid.UUID
containerSources := manifestSource.Content.Containers
if len(containerSources) > 1 {
// only one pipeline can embed containers
pipelines := make([]string, 0, len(containerSources))
for name := range containerSources {
pipelines = append(pipelines, name)
}
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, fmt.Errorf("manifest returned %d pipelines with containers (at most 1 is supported): %s", len(containerSources), strings.Join(pipelines, ", ")))
}
for _, sources := range containerSources {
workerResolveSpecs := make([]worker.ContainerSpec, len(sources))
for idx, source := range sources {
workerResolveSpecs[idx] = worker.ContainerSpec{
Source: source.Source,
Name: source.Name,
TLSVerify: source.TLSVerify,
}
}
job := worker.ContainerResolveJob{
Arch: ir.arch.Name(),
Specs: make([]worker.ContainerSpec, len(bp.Containers)),
}
jobId, err := s.workers.EnqueueContainerResolveJob(&job, channel)
if err != nil {
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err)
}
containerResolveJobID = jobId
dependencies = append(dependencies, containerResolveJobID)
break // there can be only one
}
var ostreeResolveJobID uuid.UUID
commitSources := manifestSource.Content.OSTreeCommits
if len(commitSources) > 1 {
// only one pipeline can specify an ostree commit for content
pipelines := make([]string, 0, len(commitSources))
for name := range commitSources {
pipelines = append(pipelines, name)
}
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, fmt.Errorf("manifest returned %d pipelines with ostree commits (at most 1 is supported): %s", len(commitSources), strings.Join(pipelines, ", ")))
}
for _, sources := range commitSources {
workerResolveSpecs := make([]worker.OSTreeResolveSpec, len(sources))
for idx, source := range sources {
// ostree.SourceSpec is directly convertible to worker.OSTreeResolveSpec
workerResolveSpecs[idx] = worker.OSTreeResolveSpec(source)
}
jobID, err := s.workers.EnqueueOSTreeResolveJob(&worker.OSTreeResolveJob{Specs: workerResolveSpecs}, channel)
if err != nil {
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err)
}
ostreeResolveJobID = jobID
dependencies = append(dependencies, ostreeResolveJobID)
break // there can be only one
}
manifestJobID, err := s.workers.EnqueueManifestJobByID(&worker.ManifestJobByID{}, dependencies, channel)
if err != nil {
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err)
}
kojiFilename := fmt.Sprintf(
"%s-%s-%s.%s%s",
name,
version,
release,
ir.arch.Name(),
splitExtension(ir.imageType.Filename()),
)
kojiTarget := target.NewKojiTarget(&target.KojiTargetOptions{
Server: server,
UploadDirectory: kojiDirectory,
})
kojiTarget.OsbuildArtifact.ExportFilename = ir.imageType.Filename()
kojiTarget.OsbuildArtifact.ExportName = ir.imageType.Exports()[0]
kojiTarget.ImageName = kojiFilename
targets := []*target.Target{kojiTarget}
// add any cloud upload target if defined
if ir.target != nil {
targets = append(targets, ir.target)
}
buildID, err := s.workers.EnqueueOSBuildAsDependency(ir.arch.Name(), &worker.OSBuildJob{
PipelineNames: &worker.PipelineNames{
Build: ir.imageType.BuildPipelines(),
Payload: ir.imageType.PayloadPipelines(),
},
Targets: targets,
ManifestDynArgsIdx: common.ToPtr(1),
}, []uuid.UUID{initID, manifestJobID}, channel)
if err != nil {
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err)
}
kojiFilenames = append(kojiFilenames, kojiFilename)
buildIDs = append(buildIDs, buildID)
// copy the image request while passing it into the goroutine to prevent data races
s.goroutinesGroup.Add(1)
go func(ir imageRequest) {
generateManifest(s.goroutinesCtx, s.workers, depsolveJobID, containerResolveJobID, ostreeResolveJobID, manifestJobID, ir.imageType, ir.repositories, ir.imageOptions, manifestSeed, &bp)
defer s.goroutinesGroup.Done()
}(ir)
}
id, err = s.workers.EnqueueKojiFinalize(&worker.KojiFinalizeJob{
Server: server,
Name: name,
Version: version,
Release: release,
KojiFilenames: kojiFilenames,
KojiDirectory: kojiDirectory,
TaskID: taskID,
StartTime: uint64(time.Now().Unix()),
}, initID, buildIDs, channel)
if err != nil {
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err)
}
return id, nil
}
func generateManifest(ctx context.Context, workers *worker.Server, depsolveJobID, containerResolveJobID, ostreeResolveJobID, manifestJobID uuid.UUID, imageType distro.ImageType, repos []rpmmd.RepoConfig, options distro.ImageOptions, seed int64, b *blueprint.Blueprint) {
ctx, cancel := context.WithTimeout(ctx, time.Minute*5)
defer cancel()
// wait until job is in a pending state
var token uuid.UUID
var dynArgs []json.RawMessage
var err error
logWithId := logrus.WithField("jobId", manifestJobID)
for {
_, token, _, _, dynArgs, err = workers.RequestJobById(ctx, "", manifestJobID)
if err == jobqueue.ErrNotPending {
logWithId.Debug("Manifest job not pending, waiting for depsolve job to finish")
time.Sleep(time.Millisecond * 50)
select {
case <-ctx.Done():
logWithId.Warning("Manifest job dependencies took longer than 5 minutes to finish, or the server is shutting down, returning to avoid dangling routines")
break
default:
continue
}
}
if err != nil {
logWithId.Errorf("Error requesting manifest job: %v", err)
return
}
break
}
jobResult := &worker.ManifestJobByIDResult{
Manifest: nil,
}
defer func() {
if jobResult.JobError != nil {
logWithId.Errorf("Error in manifest job %v: %v", jobResult.JobError.Reason, err)
}
result, err := json.Marshal(jobResult)
if err != nil {
logWithId.Errorf("Error marshalling manifest job results: %v", err)
}
err = workers.FinishJob(token, result)
if err != nil {
logWithId.Errorf("Error finishing manifest job: %v", err)
}
}()
if len(dynArgs) == 0 {
reason := "No dynamic arguments"
jobResult.JobError = clienterrors.WorkerClientError(clienterrors.ErrorNoDynamicArgs, reason, nil)
return
}
var depsolveResults worker.DepsolveJobResult
err = json.Unmarshal(dynArgs[0], &depsolveResults)
if err != nil {
reason := "Error parsing dynamic arguments"
jobResult.JobError = clienterrors.WorkerClientError(clienterrors.ErrorParsingDynamicArgs, reason, nil)
return
}
_, err = workers.DepsolveJobInfo(depsolveJobID, &depsolveResults)
if err != nil {
reason := "Error reading depsolve status"
jobResult.JobError = clienterrors.WorkerClientError(clienterrors.ErrorReadingJobStatus, reason, nil)
return
}
if jobErr := depsolveResults.JobError; jobErr != nil {
if jobErr.ID == clienterrors.ErrorDNFDepsolveError || jobErr.ID == clienterrors.ErrorDNFMarkingErrors {
jobResult.JobError = clienterrors.WorkerClientError(clienterrors.ErrorDepsolveDependency, "Error in depsolve job dependency input, bad package set requested", nil)
return
}
jobResult.JobError = clienterrors.WorkerClientError(clienterrors.ErrorDepsolveDependency, "Error in depsolve job dependency", nil)
return
}
if len(depsolveResults.PackageSpecs) == 0 {
jobResult.JobError = clienterrors.WorkerClientError(clienterrors.ErrorEmptyPackageSpecs, "Received empty package specs", nil)
return
}
manifest, _, err := imageType.Manifest(b, options, repos, seed)
if err != nil {
reason := "Error generating manifest"
jobResult.JobError = clienterrors.WorkerClientError(clienterrors.ErrorManifestGeneration, reason, nil)
return
}
var containerSpecs map[string][]container.Spec
if containerResolveJobID != uuid.Nil {
// Container resolve job
var result worker.ContainerResolveJobResult
_, err := workers.ContainerResolveJobInfo(containerResolveJobID, &result)
if err != nil {
reason := "Error reading container resolve job status"
jobResult.JobError = clienterrors.WorkerClientError(clienterrors.ErrorReadingJobStatus, reason, nil)
return
}
if jobErr := result.JobError; jobErr != nil {
jobResult.JobError = clienterrors.WorkerClientError(clienterrors.ErrorContainerDependency, "Error in container resolve job dependency", nil)
return
}
// NOTE: The container resolve job doesn't hold the pipeline name for
// the container embedding, so we need to get it from the manifest
// content field. There should be only one.
var containerEmbedPipeline string
for name := range manifest.Content.Containers {
containerEmbedPipeline = name
break
}
pipelineSpecs := make([]container.Spec, len(result.Specs))
for idx, resultSpec := range result.Specs {
pipelineSpecs[idx] = container.Spec{
Source: resultSpec.Source,
Digest: resultSpec.Digest,
LocalName: resultSpec.Name,
TLSVerify: resultSpec.TLSVerify,
ImageID: resultSpec.ImageID,
ListDigest: resultSpec.ListDigest,
}
}
containerSpecs = map[string][]container.Spec{
containerEmbedPipeline: pipelineSpecs,
}
}
var ostreeCommitSpecs map[string][]ostree.CommitSpec
if ostreeResolveJobID != uuid.Nil {
var result worker.OSTreeResolveJobResult
_, err := workers.OSTreeResolveJobInfo(ostreeResolveJobID, &result)
if err != nil {
reason := "Error reading ostree resolve job status"
logrus.Errorf("%s: %v", reason, err)
jobResult.JobError = clienterrors.WorkerClientError(clienterrors.ErrorReadingJobStatus, reason, nil)
return
}
if jobErr := result.JobError; jobErr != nil {
jobResult.JobError = clienterrors.WorkerClientError(clienterrors.ErrorOSTreeDependency, "Error in ostree resolve job dependency", nil)
return
}
// NOTE: The ostree resolve job doesn't hold the pipeline name for the
// ostree commits, so we need to get it from the manifest content
// field. There should be only one.
var ostreeCommitPipeline string
for name := range manifest.Content.OSTreeCommits {
ostreeCommitPipeline = name
break
}
commitSpecs := make([]ostree.CommitSpec, len(result.Specs))
for idx, resultSpec := range result.Specs {
commitSpecs[idx] = ostree.CommitSpec{
Ref: resultSpec.Ref,
URL: resultSpec.URL,
Checksum: resultSpec.Checksum,
}
}
ostreeCommitSpecs = map[string][]ostree.CommitSpec{
ostreeCommitPipeline: commitSpecs,
}
}
ms, err := manifest.Serialize(depsolveResults.PackageSpecs, containerSpecs, ostreeCommitSpecs)
jobResult.Manifest = ms
}