Use the commit sources provided by the content on the initialised manifest instead of the image options to resolve commits. The ostree sources on the manifest are a map keyed by the name of the pipeline that will use the resolved commit spec, but unlike with the package sets, the worker's commit resolve job works on a slice, so we reread the content map to get the pipeline name. This requires that there be only one pipeline that requires a resolved ostree commit, which is currently true of all our image types. Setting the default ostree ref on the image options before calling Manifest() isn't needed anymore.
528 lines
17 KiB
Go
528 lines
17 KiB
Go
package v2
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"net/http"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/getkin/kin-openapi/openapi3"
|
|
"github.com/getkin/kin-openapi/routers"
|
|
legacyrouter "github.com/getkin/kin-openapi/routers/legacy"
|
|
"github.com/google/uuid"
|
|
"github.com/labstack/echo/v4"
|
|
"github.com/labstack/echo/v4/middleware"
|
|
"github.com/sirupsen/logrus"
|
|
|
|
"github.com/osbuild/osbuild-composer/pkg/jobqueue"
|
|
|
|
"github.com/osbuild/osbuild-composer/internal/blueprint"
|
|
"github.com/osbuild/osbuild-composer/internal/common"
|
|
"github.com/osbuild/osbuild-composer/internal/container"
|
|
"github.com/osbuild/osbuild-composer/internal/distro"
|
|
"github.com/osbuild/osbuild-composer/internal/distroregistry"
|
|
"github.com/osbuild/osbuild-composer/internal/ostree"
|
|
"github.com/osbuild/osbuild-composer/internal/prometheus"
|
|
"github.com/osbuild/osbuild-composer/internal/rpmmd"
|
|
"github.com/osbuild/osbuild-composer/internal/target"
|
|
"github.com/osbuild/osbuild-composer/internal/worker"
|
|
"github.com/osbuild/osbuild-composer/internal/worker/clienterrors"
|
|
)
|
|
|
|
// Server represents the state of the cloud Server
|
|
type Server struct {
|
|
workers *worker.Server
|
|
distros *distroregistry.Registry
|
|
config ServerConfig
|
|
router routers.Router
|
|
|
|
goroutinesCtx context.Context
|
|
goroutinesCtxCancel context.CancelFunc
|
|
goroutinesGroup sync.WaitGroup
|
|
}
|
|
|
|
type ServerConfig struct {
|
|
TenantProviderFields []string
|
|
JWTEnabled bool
|
|
}
|
|
|
|
func NewServer(workers *worker.Server, distros *distroregistry.Registry, config ServerConfig) *Server {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
spec, err := GetSwagger()
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
loader := openapi3.NewLoader()
|
|
if err := spec.Validate(loader.Context); err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
router, err := legacyrouter.NewRouter(spec)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
server := &Server{
|
|
workers: workers,
|
|
distros: distros,
|
|
config: config,
|
|
router: router,
|
|
|
|
goroutinesCtx: ctx,
|
|
goroutinesCtxCancel: cancel,
|
|
}
|
|
return server
|
|
}
|
|
|
|
func (s *Server) Handler(path string) http.Handler {
|
|
e := echo.New()
|
|
e.Binder = binder{}
|
|
e.HTTPErrorHandler = s.HTTPErrorHandler
|
|
e.Pre(common.OperationIDMiddleware)
|
|
e.Use(middleware.Recover())
|
|
e.Logger = common.Logger()
|
|
|
|
handler := apiHandlers{
|
|
server: s,
|
|
}
|
|
|
|
statusMW := prometheus.StatusMiddleware(prometheus.ComposerSubsystem)
|
|
RegisterHandlers(e.Group(path, prometheus.MetricsMiddleware, s.ValidateRequest, statusMW), &handler)
|
|
|
|
return e
|
|
}
|
|
|
|
func (s *Server) Shutdown() {
|
|
s.goroutinesCtxCancel()
|
|
s.goroutinesGroup.Wait()
|
|
}
|
|
|
|
func (s *Server) enqueueCompose(distribution distro.Distro, bp blueprint.Blueprint, manifestSeed int64, irs []imageRequest, channel string) (uuid.UUID, error) {
|
|
var id uuid.UUID
|
|
if len(irs) != 1 {
|
|
return id, HTTPError(ErrorInvalidNumberOfImageBuilds)
|
|
}
|
|
ir := irs[0]
|
|
|
|
manifestSource, _, err := ir.imageType.Manifest(&bp, ir.imageOptions, ir.repositories, manifestSeed)
|
|
if err != nil {
|
|
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err)
|
|
}
|
|
|
|
depsolveJobID, err := s.workers.EnqueueDepsolve(&worker.DepsolveJob{
|
|
PackageSets: manifestSource.Content.PackageSets,
|
|
ModulePlatformID: distribution.ModulePlatformID(),
|
|
Arch: ir.arch.Name(),
|
|
Releasever: distribution.Releasever(),
|
|
}, channel)
|
|
if err != nil {
|
|
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err)
|
|
}
|
|
|
|
dependencies := []uuid.UUID{depsolveJobID}
|
|
var containerResolveJob uuid.UUID
|
|
if len(bp.Containers) > 0 {
|
|
job := worker.ContainerResolveJob{
|
|
Arch: ir.arch.Name(),
|
|
Specs: make([]worker.ContainerSpec, len(bp.Containers)),
|
|
}
|
|
|
|
for i, c := range bp.Containers {
|
|
job.Specs[i] = worker.ContainerSpec{
|
|
Source: c.Source,
|
|
Name: c.Name,
|
|
TLSVerify: c.TLSVerify,
|
|
}
|
|
}
|
|
|
|
jobId, err := s.workers.EnqueueContainerResolveJob(&job, channel)
|
|
|
|
if err != nil {
|
|
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err)
|
|
}
|
|
|
|
containerResolveJob = jobId
|
|
dependencies = append(dependencies, jobId)
|
|
}
|
|
|
|
var ostreeResolveJobID uuid.UUID
|
|
commitSources := manifestSource.Content.OSTreeCommits
|
|
if len(commitSources) > 1 {
|
|
// only one pipeline can specify an ostree commit for content
|
|
pipelines := make([]string, 0, len(commitSources))
|
|
for name := range commitSources {
|
|
pipelines = append(pipelines, name)
|
|
}
|
|
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, fmt.Errorf("manifest returned %d pipelines with ostree commits (at most 1 is supported): %s", len(commitSources), strings.Join(pipelines, ", ")))
|
|
}
|
|
for _, sources := range commitSources {
|
|
workerResolveSpecs := make([]worker.OSTreeResolveSpec, len(sources))
|
|
for idx, source := range sources {
|
|
// ostree.SourceSpec is directly convertible to worker.OSTreeResolveSpec
|
|
workerResolveSpecs[idx] = worker.OSTreeResolveSpec(source)
|
|
}
|
|
jobID, err := s.workers.EnqueueOSTreeResolveJob(&worker.OSTreeResolveJob{Specs: workerResolveSpecs}, channel)
|
|
if err != nil {
|
|
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err)
|
|
}
|
|
|
|
ostreeResolveJobID = jobID
|
|
dependencies = append(dependencies, ostreeResolveJobID)
|
|
break // there can be only one
|
|
}
|
|
|
|
manifestJobID, err := s.workers.EnqueueManifestJobByID(&worker.ManifestJobByID{}, dependencies, channel)
|
|
if err != nil {
|
|
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err)
|
|
}
|
|
|
|
id, err = s.workers.EnqueueOSBuildAsDependency(ir.arch.Name(), &worker.OSBuildJob{
|
|
Targets: []*target.Target{ir.target},
|
|
PipelineNames: &worker.PipelineNames{
|
|
Build: ir.imageType.BuildPipelines(),
|
|
Payload: ir.imageType.PayloadPipelines(),
|
|
},
|
|
}, []uuid.UUID{manifestJobID}, channel)
|
|
if err != nil {
|
|
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err)
|
|
}
|
|
|
|
s.goroutinesGroup.Add(1)
|
|
go func() {
|
|
generateManifest(s.goroutinesCtx, s.workers, depsolveJobID, containerResolveJob, ostreeResolveJobID, manifestJobID, ir.imageType, ir.repositories, ir.imageOptions, manifestSeed, &bp)
|
|
defer s.goroutinesGroup.Done()
|
|
}()
|
|
|
|
return id, nil
|
|
}
|
|
|
|
func (s *Server) enqueueKojiCompose(taskID uint64, server, name, version, release string, distribution distro.Distro, bp blueprint.Blueprint, manifestSeed int64, irs []imageRequest, channel string) (uuid.UUID, error) {
|
|
var id uuid.UUID
|
|
kojiDirectory := "osbuild-cg/osbuild-composer-koji-" + uuid.New().String()
|
|
|
|
initID, err := s.workers.EnqueueKojiInit(&worker.KojiInitJob{
|
|
Server: server,
|
|
Name: name,
|
|
Version: version,
|
|
Release: release,
|
|
}, channel)
|
|
if err != nil {
|
|
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err)
|
|
}
|
|
|
|
var kojiFilenames []string
|
|
var buildIDs []uuid.UUID
|
|
for _, ir := range irs {
|
|
manifestSource, _, err := ir.imageType.Manifest(&bp, ir.imageOptions, ir.repositories, manifestSeed)
|
|
if err != nil {
|
|
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err)
|
|
}
|
|
|
|
depsolveJobID, err := s.workers.EnqueueDepsolve(&worker.DepsolveJob{
|
|
PackageSets: manifestSource.Content.PackageSets,
|
|
ModulePlatformID: distribution.ModulePlatformID(),
|
|
Arch: ir.arch.Name(),
|
|
Releasever: distribution.Releasever(),
|
|
}, channel)
|
|
if err != nil {
|
|
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err)
|
|
}
|
|
|
|
var containerResolveJob uuid.UUID
|
|
dependencies := []uuid.UUID{depsolveJobID}
|
|
if len(bp.Containers) > 0 {
|
|
job := worker.ContainerResolveJob{
|
|
Arch: ir.arch.Name(),
|
|
Specs: make([]worker.ContainerSpec, len(bp.Containers)),
|
|
}
|
|
|
|
for i, c := range bp.Containers {
|
|
job.Specs[i] = worker.ContainerSpec{
|
|
Source: c.Source,
|
|
Name: c.Name,
|
|
TLSVerify: c.TLSVerify,
|
|
}
|
|
}
|
|
|
|
jobId, err := s.workers.EnqueueContainerResolveJob(&job, channel)
|
|
|
|
if err != nil {
|
|
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err)
|
|
}
|
|
|
|
containerResolveJob = jobId
|
|
dependencies = append(dependencies, jobId)
|
|
}
|
|
|
|
var ostreeResolveJobID uuid.UUID
|
|
commitSources := manifestSource.Content.OSTreeCommits
|
|
if len(commitSources) > 1 {
|
|
// only one pipeline can specify an ostree commit for content
|
|
pipelines := make([]string, 0, len(commitSources))
|
|
for name := range commitSources {
|
|
pipelines = append(pipelines, name)
|
|
}
|
|
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, fmt.Errorf("manifest returned %d pipelines with ostree commits (at most 1 is supported): %s", len(commitSources), strings.Join(pipelines, ", ")))
|
|
}
|
|
for _, sources := range commitSources {
|
|
workerResolveSpecs := make([]worker.OSTreeResolveSpec, len(sources))
|
|
for idx, source := range sources {
|
|
// ostree.SourceSpec is directly convertible to worker.OSTreeResolveSpec
|
|
workerResolveSpecs[idx] = worker.OSTreeResolveSpec(source)
|
|
}
|
|
jobID, err := s.workers.EnqueueOSTreeResolveJob(&worker.OSTreeResolveJob{Specs: workerResolveSpecs}, channel)
|
|
if err != nil {
|
|
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err)
|
|
}
|
|
|
|
ostreeResolveJobID = jobID
|
|
dependencies = append(dependencies, ostreeResolveJobID)
|
|
break // there can be only one
|
|
}
|
|
|
|
manifestJobID, err := s.workers.EnqueueManifestJobByID(&worker.ManifestJobByID{}, dependencies, channel)
|
|
if err != nil {
|
|
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err)
|
|
}
|
|
kojiFilename := fmt.Sprintf(
|
|
"%s-%s-%s.%s%s",
|
|
name,
|
|
version,
|
|
release,
|
|
ir.arch.Name(),
|
|
splitExtension(ir.imageType.Filename()),
|
|
)
|
|
|
|
kojiTarget := target.NewKojiTarget(&target.KojiTargetOptions{
|
|
Server: server,
|
|
UploadDirectory: kojiDirectory,
|
|
})
|
|
kojiTarget.OsbuildArtifact.ExportFilename = ir.imageType.Filename()
|
|
kojiTarget.OsbuildArtifact.ExportName = ir.imageType.Exports()[0]
|
|
kojiTarget.ImageName = kojiFilename
|
|
|
|
targets := []*target.Target{kojiTarget}
|
|
// add any cloud upload target if defined
|
|
if ir.target != nil {
|
|
targets = append(targets, ir.target)
|
|
}
|
|
|
|
buildID, err := s.workers.EnqueueOSBuildAsDependency(ir.arch.Name(), &worker.OSBuildJob{
|
|
PipelineNames: &worker.PipelineNames{
|
|
Build: ir.imageType.BuildPipelines(),
|
|
Payload: ir.imageType.PayloadPipelines(),
|
|
},
|
|
Targets: targets,
|
|
ManifestDynArgsIdx: common.ToPtr(1),
|
|
}, []uuid.UUID{initID, manifestJobID}, channel)
|
|
if err != nil {
|
|
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err)
|
|
}
|
|
kojiFilenames = append(kojiFilenames, kojiFilename)
|
|
buildIDs = append(buildIDs, buildID)
|
|
|
|
// copy the image request while passing it into the goroutine to prevent data races
|
|
s.goroutinesGroup.Add(1)
|
|
go func(ir imageRequest) {
|
|
generateManifest(s.goroutinesCtx, s.workers, depsolveJobID, containerResolveJob, ostreeResolveJobID, manifestJobID, ir.imageType, ir.repositories, ir.imageOptions, manifestSeed, &bp)
|
|
defer s.goroutinesGroup.Done()
|
|
}(ir)
|
|
}
|
|
id, err = s.workers.EnqueueKojiFinalize(&worker.KojiFinalizeJob{
|
|
Server: server,
|
|
Name: name,
|
|
Version: version,
|
|
Release: release,
|
|
KojiFilenames: kojiFilenames,
|
|
KojiDirectory: kojiDirectory,
|
|
TaskID: taskID,
|
|
StartTime: uint64(time.Now().Unix()),
|
|
}, initID, buildIDs, channel)
|
|
if err != nil {
|
|
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err)
|
|
}
|
|
|
|
return id, nil
|
|
}
|
|
|
|
func generateManifest(ctx context.Context, workers *worker.Server, depsolveJobID, containerResolveJobID, ostreeResolveJobID, manifestJobID uuid.UUID, imageType distro.ImageType, repos []rpmmd.RepoConfig, options distro.ImageOptions, seed int64, b *blueprint.Blueprint) {
|
|
ctx, cancel := context.WithTimeout(ctx, time.Minute*5)
|
|
defer cancel()
|
|
|
|
// wait until job is in a pending state
|
|
var token uuid.UUID
|
|
var dynArgs []json.RawMessage
|
|
var err error
|
|
logWithId := logrus.WithField("jobId", manifestJobID)
|
|
for {
|
|
_, token, _, _, dynArgs, err = workers.RequestJobById(ctx, "", manifestJobID)
|
|
if err == jobqueue.ErrNotPending {
|
|
logWithId.Debug("Manifest job not pending, waiting for depsolve job to finish")
|
|
time.Sleep(time.Millisecond * 50)
|
|
select {
|
|
case <-ctx.Done():
|
|
logWithId.Warning("Manifest job dependencies took longer than 5 minutes to finish, or the server is shutting down, returning to avoid dangling routines")
|
|
break
|
|
default:
|
|
continue
|
|
}
|
|
}
|
|
if err != nil {
|
|
logWithId.Errorf("Error requesting manifest job: %v", err)
|
|
return
|
|
}
|
|
break
|
|
}
|
|
|
|
jobResult := &worker.ManifestJobByIDResult{
|
|
Manifest: nil,
|
|
}
|
|
|
|
defer func() {
|
|
if jobResult.JobError != nil {
|
|
logWithId.Errorf("Error in manifest job %v: %v", jobResult.JobError.Reason, err)
|
|
}
|
|
|
|
result, err := json.Marshal(jobResult)
|
|
if err != nil {
|
|
logWithId.Errorf("Error marshalling manifest job results: %v", err)
|
|
}
|
|
|
|
err = workers.FinishJob(token, result)
|
|
if err != nil {
|
|
logWithId.Errorf("Error finishing manifest job: %v", err)
|
|
}
|
|
}()
|
|
|
|
if len(dynArgs) == 0 {
|
|
reason := "No dynamic arguments"
|
|
jobResult.JobError = clienterrors.WorkerClientError(clienterrors.ErrorNoDynamicArgs, reason, nil)
|
|
return
|
|
}
|
|
|
|
var depsolveResults worker.DepsolveJobResult
|
|
err = json.Unmarshal(dynArgs[0], &depsolveResults)
|
|
if err != nil {
|
|
reason := "Error parsing dynamic arguments"
|
|
jobResult.JobError = clienterrors.WorkerClientError(clienterrors.ErrorParsingDynamicArgs, reason, nil)
|
|
return
|
|
}
|
|
|
|
_, err = workers.DepsolveJobInfo(depsolveJobID, &depsolveResults)
|
|
if err != nil {
|
|
reason := "Error reading depsolve status"
|
|
jobResult.JobError = clienterrors.WorkerClientError(clienterrors.ErrorReadingJobStatus, reason, nil)
|
|
return
|
|
}
|
|
|
|
if jobErr := depsolveResults.JobError; jobErr != nil {
|
|
if jobErr.ID == clienterrors.ErrorDNFDepsolveError || jobErr.ID == clienterrors.ErrorDNFMarkingErrors {
|
|
jobResult.JobError = clienterrors.WorkerClientError(clienterrors.ErrorDepsolveDependency, "Error in depsolve job dependency input, bad package set requested", nil)
|
|
return
|
|
}
|
|
jobResult.JobError = clienterrors.WorkerClientError(clienterrors.ErrorDepsolveDependency, "Error in depsolve job dependency", nil)
|
|
return
|
|
}
|
|
|
|
if len(depsolveResults.PackageSpecs) == 0 {
|
|
jobResult.JobError = clienterrors.WorkerClientError(clienterrors.ErrorEmptyPackageSpecs, "Received empty package specs", nil)
|
|
return
|
|
}
|
|
|
|
var containerSpecs []container.Spec
|
|
if containerResolveJobID != uuid.Nil {
|
|
// Container resolve job
|
|
var result worker.ContainerResolveJobResult
|
|
|
|
_, err := workers.ContainerResolveJobInfo(containerResolveJobID, &result)
|
|
|
|
if err != nil {
|
|
reason := "Error reading container resolve job status"
|
|
jobResult.JobError = clienterrors.WorkerClientError(clienterrors.ErrorReadingJobStatus, reason, nil)
|
|
return
|
|
}
|
|
|
|
if jobErr := result.JobError; jobErr != nil {
|
|
jobResult.JobError = clienterrors.WorkerClientError(clienterrors.ErrorContainerDependency, "Error in container resolve job dependency", nil)
|
|
return
|
|
}
|
|
|
|
containerSpecs = make([]container.Spec, len(result.Specs))
|
|
|
|
for i, s := range result.Specs {
|
|
containerSpecs[i].Source = s.Source
|
|
containerSpecs[i].Digest = s.Digest
|
|
containerSpecs[i].LocalName = s.Name
|
|
containerSpecs[i].TLSVerify = s.TLSVerify
|
|
containerSpecs[i].ImageID = s.ImageID
|
|
containerSpecs[i].ListDigest = s.ListDigest
|
|
}
|
|
}
|
|
|
|
manifest, _, err := imageType.Manifest(b, options, repos, seed)
|
|
if err != nil {
|
|
reason := "Error generating manifest"
|
|
jobResult.JobError = clienterrors.WorkerClientError(clienterrors.ErrorManifestGeneration, reason, nil)
|
|
return
|
|
}
|
|
|
|
var ostreeCommitSpecs map[string][]ostree.CommitSpec
|
|
if ostreeResolveJobID != uuid.Nil {
|
|
var result worker.OSTreeResolveJobResult
|
|
_, err := workers.OSTreeResolveJobInfo(ostreeResolveJobID, &result)
|
|
|
|
if err != nil {
|
|
reason := "Error reading ostree resolve job status"
|
|
logrus.Errorf("%s: %v", reason, err)
|
|
jobResult.JobError = clienterrors.WorkerClientError(clienterrors.ErrorReadingJobStatus, reason, nil)
|
|
return
|
|
}
|
|
|
|
if jobErr := result.JobError; jobErr != nil {
|
|
jobResult.JobError = clienterrors.WorkerClientError(clienterrors.ErrorOSTreeDependency, "Error in ostree resolve job dependency", nil)
|
|
return
|
|
}
|
|
|
|
// NOTE: The ostree resolve job doesn't hold the pipeline name for the
|
|
// ostree commits, so we need to get it from the manifest content
|
|
// field. There should be only one.
|
|
var ostreeCommitPipeline string
|
|
for name := range manifest.Content.OSTreeCommits {
|
|
ostreeCommitPipeline = name
|
|
break
|
|
}
|
|
|
|
commitSpecs := make([]ostree.CommitSpec, len(result.Specs))
|
|
for idx, resultSpec := range result.Specs {
|
|
commitSpecs[idx] = ostree.CommitSpec{
|
|
Ref: resultSpec.Ref,
|
|
URL: resultSpec.URL,
|
|
Checksum: resultSpec.Checksum,
|
|
}
|
|
}
|
|
ostreeCommitSpecs = map[string][]ostree.CommitSpec{
|
|
ostreeCommitPipeline: commitSpecs,
|
|
}
|
|
}
|
|
|
|
// NOTE: This assumes that containers are only embedded in the first
|
|
// payload pipeline, which is currently true for all image types but might
|
|
// not necessarily be in the future. This is a workaround required for this
|
|
// temporary state where the cloud API is not using the new manifest
|
|
// generation procedure. Once it's updated, the container specs will be
|
|
// mapped to pipeline names properly by the image type itself.
|
|
var payloadPipelineName string
|
|
if pipelineNames := imageType.PayloadPipelines(); len(pipelineNames) > 0 {
|
|
payloadPipelineName = pipelineNames[0]
|
|
} else {
|
|
panic(fmt.Sprintf("ImageType %q does not define payload pipelines - this is a programming error", imageType.Name()))
|
|
}
|
|
// TODO: resolve ostree source spec from manifest content and pass here.
|
|
ms, err := manifest.Serialize(depsolveResults.PackageSpecs, map[string][]container.Spec{payloadPipelineName: containerSpecs}, ostreeCommitSpecs)
|
|
|
|
jobResult.Manifest = ms
|
|
}
|