store: refactor compose handling

All the preceding changes in jobqueue, compose package, common types
etc. were a base for refactoring the store so that a compose can handle
multiple image builds. This commit introduces the change in a backwards
compatible way, so that the weldr api don't change.
This commit is contained in:
Martin Sehnoutka 2020-02-10 09:40:33 +01:00 committed by Ondřej Budai
parent d02bac15fe
commit 962365251d

View file

@ -10,6 +10,7 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/osbuild/osbuild-composer/internal/compose"
"io"
"io/ioutil"
"log"
@ -36,46 +37,26 @@ import (
type Store struct {
Blueprints map[string]blueprint.Blueprint `json:"blueprints"`
Workspace map[string]blueprint.Blueprint `json:"workspace"`
Composes map[uuid.UUID]Compose `json:"composes"`
Composes map[uuid.UUID]compose.Compose `json:"composes"`
Sources map[string]SourceConfig `json:"sources"`
BlueprintsChanges map[string]map[string]blueprint.Change `json:"changes"`
mu sync.RWMutex // protects all fields
pendingJobs chan Job
stateChannel chan []byte
distro distro.Distro
stateDir *string
}
// A Compose represent the task of building one image. It contains all the information
// necessary to generate the inputs for the job, as well as the job's state.
type Compose struct {
QueueStatus string `json:"queue_status"`
Blueprint *blueprint.Blueprint `json:"blueprint"`
OutputType string `json:"output-type"`
Pipeline *pipeline.Pipeline `json:"pipeline"`
Targets []*target.Target `json:"targets"`
JobCreated time.Time `json:"job_created"`
JobStarted time.Time `json:"job_started"`
JobFinished time.Time `json:"job_finished"`
Image *Image `json:"image"`
Size uint64 `json:"size"`
mu sync.RWMutex // protects all fields
pendingJobs chan Job
stateChannel chan []byte
distro distro.Distro
distroRegistry distro.Registry
stateDir *string
}
// A Job contains the information about a compose a worker needs to process it.
type Job struct {
ComposeID uuid.UUID
Distro string
Pipeline *pipeline.Pipeline
Targets []*target.Target
OutputType string
}
// An Image represents the image resulting from a compose.
type Image struct {
Path string
Mime string
Size int64
ComposeID uuid.UUID
ImageBuildID int
Distro string
Pipeline *pipeline.Pipeline
Targets []*target.Target
ImageType string
}
type SourceConfig struct {
@ -119,7 +100,7 @@ func (e *InvalidRequestError) Error() string {
return e.message
}
func New(stateDir *string, distro distro.Distro) *Store {
func New(stateDir *string, distroArg distro.Distro, distroRegistryArg distro.Registry) *Store {
var s Store
if stateDir != nil {
@ -153,7 +134,8 @@ func New(stateDir *string, distro distro.Distro) *Store {
}
s.pendingJobs = make(chan Job, 200)
s.distro = distro
s.distro = distroArg
s.distroRegistry = distroRegistryArg
s.stateDir = stateDir
if s.Blueprints == nil {
@ -163,22 +145,36 @@ func New(stateDir *string, distro distro.Distro) *Store {
s.Workspace = make(map[string]blueprint.Blueprint)
}
if s.Composes == nil {
s.Composes = make(map[uuid.UUID]Compose)
s.Composes = make(map[uuid.UUID]compose.Compose)
} else {
for composeID, compose := range s.Composes {
switch compose.QueueStatus {
case "RUNNING":
// We do not support resuming an in-flight build
compose.QueueStatus = "FAILED"
s.Composes[composeID] = compose
case "WAITING":
// Push waiting composes back into the pending jobs queue
s.pendingJobs <- Job{
ComposeID: composeID,
Distro: s.distro.Name(),
Pipeline: compose.Pipeline,
Targets: compose.Targets,
OutputType: compose.OutputType,
if len(compose.ImageBuilds) == 0 {
panic("the was a compose with zero image builds, that is forbidden")
}
for _, imgBuild := range compose.ImageBuilds {
switch imgBuild.QueueStatus {
case common.IBRunning:
// We do not support resuming an in-flight build
imgBuild.QueueStatus = common.IBFailed
// s.Composes[composeID] = compose
case common.IBWaiting:
// Push waiting composes back into the pending jobs queue
distroStr, exists := imgBuild.Distro.ToString()
if !exists {
panic("fatal error, distro tag should exist but does not")
}
imageTypeCompat, exists := imgBuild.ImageType.ToCompatString()
if !exists {
panic("fatal error, image type tag should exist but does not")
}
s.pendingJobs <- Job{
ComposeID: composeID,
ImageBuildID: imgBuild.Id,
Distro: distroStr,
Pipeline: imgBuild.Pipeline,
Targets: imgBuild.Targets,
ImageType: imageTypeCompat,
}
}
}
}
@ -203,7 +199,8 @@ func writeFileAtomically(filename string, data []byte, mode os.FileMode) error {
_, err = tmpfile.Write(data)
if err != nil {
os.Remove(tmpfile.Name())
// FIXME: handle or comment this possible error
_ = os.Remove(tmpfile.Name())
return err
}
@ -214,13 +211,15 @@ func writeFileAtomically(filename string, data []byte, mode os.FileMode) error {
err = tmpfile.Close()
if err != nil {
os.Remove(tmpfile.Name())
// FIXME: handle or comment this possible error
_ = os.Remove(tmpfile.Name())
return err
}
err = os.Rename(tmpfile.Name(), filename)
if err != nil {
os.Remove(tmpfile.Name())
// FIXME: handle or comment this possible error
_ = os.Remove(tmpfile.Name())
return err
}
@ -367,7 +366,8 @@ func bumpVersion(str string) string {
}
func (s *Store) PushBlueprint(bp blueprint.Blueprint, commitMsg string) {
s.change(func() error {
// FIXME: handle or comment this possible error
_ = s.change(func() error {
commit, err := randomSHA1String()
if err != nil {
return err
@ -397,14 +397,16 @@ func (s *Store) PushBlueprint(bp blueprint.Blueprint, commitMsg string) {
}
func (s *Store) PushBlueprintToWorkspace(bp blueprint.Blueprint) {
s.change(func() error {
// FIXME: handle or comment this possible error
_ = s.change(func() error {
s.Workspace[bp.Name] = bp
return nil
})
}
func (s *Store) DeleteBlueprint(name string) {
s.change(func() error {
// FIXME: handle or comment this possible error
_ = s.change(func() error {
delete(s.Workspace, name)
delete(s.Blueprints, name)
return nil
@ -412,13 +414,14 @@ func (s *Store) DeleteBlueprint(name string) {
}
func (s *Store) DeleteBlueprintFromWorkspace(name string) {
s.change(func() error {
// FIXME: handle or comment this possible error
_ = s.change(func() error {
delete(s.Workspace, name)
return nil
})
}
func (s *Store) GetCompose(id uuid.UUID) (Compose, bool) {
func (s *Store) GetCompose(id uuid.UUID) (compose.Compose, bool) {
s.mu.RLock()
defer s.mu.RUnlock()
@ -426,24 +429,16 @@ func (s *Store) GetCompose(id uuid.UUID) (Compose, bool) {
return compose, exists
}
func (s *Store) GetAllComposes() map[uuid.UUID]Compose {
// GetAllComposes creates a deep copy of all composes present in this store
// and returns them as a dictionary with compose UUIDs as keys
func (s *Store) GetAllComposes() map[uuid.UUID]compose.Compose {
s.mu.RLock()
defer s.mu.RUnlock()
composes := make(map[uuid.UUID]Compose)
for id, compose := range s.Composes {
newCompose := compose
newCompose.Targets = []*target.Target{}
for _, t := range compose.Targets {
newTarget := *t
newCompose.Targets = append(newCompose.Targets, &newTarget)
}
newBlueprint := *compose.Blueprint
newCompose.Blueprint = &newBlueprint
composes := make(map[uuid.UUID]compose.Compose)
for id, singleCompose := range s.Composes {
newCompose := singleCompose.DeepCopy()
composes[id] = newCompose
}
@ -457,11 +452,24 @@ func (s *Store) GetComposeResult(id uuid.UUID) (io.ReadCloser, error) {
return os.Open(*s.stateDir + "/outputs/" + id.String() + "/result.json")
}
func (s *Store) getImageLocationForLocalTarget(composeID uuid.UUID) string {
// This might result in conflicts because one compose can have multiple images, but (!) the Weldr API
// does not support multiple images per build and the RCM API does not support Local target, so it does
// not really matter any more.
return *s.stateDir + "/outputs/" + composeID.String()
}
func (s *Store) PushCompose(composeID uuid.UUID, bp *blueprint.Blueprint, checksums map[string]string, arch, composeType string, size uint64, uploadTarget *target.Target) error {
targets := []*target.Target{}
// Compatibility layer for image types in Weldr API v0
imageType, exists := common.ImageTypeFromCompatString(composeType)
if !exists {
panic("fatal error, compose type does not exist")
}
if s.stateDir != nil {
outputDir := *s.stateDir + "/outputs/" + composeID.String()
outputDir := s.getImageLocationForLocalTarget(composeID)
err := os.MkdirAll(outputDir, 0755)
if err != nil {
@ -475,6 +483,8 @@ func (s *Store) PushCompose(composeID uuid.UUID, bp *blueprint.Blueprint, checks
))
}
size = s.distro.GetSizeForOutputType(composeType, size)
if uploadTarget != nil {
targets = append(targets, uploadTarget)
}
@ -484,35 +494,119 @@ func (s *Store) PushCompose(composeID uuid.UUID, bp *blueprint.Blueprint, checks
repos = append(repos, source.RepoConfig())
}
size = s.distro.GetSizeForOutputType(composeType, size)
pipeline, err := s.distro.Pipeline(bp, repos, checksums, arch, composeType, size)
pipelineStruct, err := s.distro.Pipeline(bp, repos, checksums, arch, composeType, size)
if err != nil {
return err
}
s.change(func() error {
s.Composes[composeID] = Compose{
QueueStatus: "WAITING",
Blueprint: bp,
Pipeline: pipeline,
OutputType: composeType,
Targets: targets,
JobCreated: time.Now(),
Size: size,
// FIXME: handle or comment this possible error
_ = s.change(func() error {
s.Composes[composeID] = compose.Compose{
Blueprint: bp,
ImageBuilds: []compose.ImageBuild{
{
QueueStatus: common.IBWaiting,
Pipeline: pipelineStruct,
ImageType: imageType,
Targets: targets,
JobCreated: time.Now(),
Size: size,
},
},
}
return nil
})
s.pendingJobs <- Job{
ComposeID: composeID,
Distro: s.distro.Name(),
Pipeline: pipeline,
Targets: targets,
OutputType: composeType,
ComposeID: composeID,
ImageBuildID: 0,
Distro: s.distro.Name(),
Pipeline: pipelineStruct,
Targets: targets,
ImageType: composeType,
}
return nil
}
// PushComposeRequest is an alternative to PushCompose which does not assume a pre-defined distro, as such it is better
// suited for RCM API and possible future API that would respect the fact that we can build any distro and any arch
func (s *Store) PushComposeRequest(request common.ComposeRequest) error {
// This should never happen and once distro.Pipeline is refactored this check will go away
arch, exists := request.Arch.ToString()
if !exists {
panic("fatal error, arch should exist but it does not")
}
distroString, exists := request.Distro.ToString()
if !exists {
panic("fatal error, distro should exist but it does not")
}
distroStruct := s.distroRegistry.GetDistro(distroString)
if distroStruct == nil {
panic("fatal error, distro should exist but it is not in the registry")
}
// This will be a list of imageBuilds that will be submitted to the state channel
imageBuilds := []compose.ImageBuild{}
newJobs := []Job{}
// TODO: remove this
if len(request.RequestedImages) > 1 {
panic("Multiple image requests are not yet properly implemented")
}
for n, imageRequest := range request.RequestedImages {
// TODO: handle custom upload targets
// TODO: this requires changes in the Compose Request struct
// TODO: implment support for no checksums
// This should never happen and once distro.Pipeline is refactored this check will go away
imgTypeCompatStr, exists := imageRequest.ImgType.ToCompatString()
if !exists {
panic("fatal error, image type should exist but it does not")
}
pipelineStruct, err := distroStruct.Pipeline(&request.Blueprint, request.Repositories, nil, arch, imgTypeCompatStr, 0)
if err != nil {
return err
}
// This will make the job submission atomic: either all of them or none of them
newJobs = append(newJobs, Job{
ComposeID: request.ComposeID,
ImageBuildID: n,
Distro: distroString,
Pipeline: pipelineStruct,
Targets: []*target.Target{},
ImageType: imgTypeCompatStr,
})
imageBuilds = append(imageBuilds, compose.ImageBuild{
Distro: request.Distro,
QueueStatus: common.IBWaiting,
ImageType: imageRequest.ImgType,
Pipeline: pipelineStruct,
Targets: []*target.Target{},
JobCreated: time.Now(),
})
}
// submit all the jobs now
for _, job := range newJobs {
s.pendingJobs <- job
}
// ignore error because the previous implementation does the same
_ = s.change(func() error {
s.Composes[request.ComposeID] = compose.Compose{
Blueprint: &request.Blueprint,
ImageBuilds: imageBuilds,
}
return nil
})
return nil
}
// DeleteCompose deletes the compose from the state file and also removes all files on disk that are
// associated with this compose
func (s *Store) DeleteCompose(id uuid.UUID) error {
return s.change(func() error {
compose, exists := s.Composes[id]
@ -521,7 +615,14 @@ func (s *Store) DeleteCompose(id uuid.UUID) error {
return &NotFoundError{}
}
if compose.QueueStatus != "FINISHED" && compose.QueueStatus != "FAILED" {
// If any of the image builds have build artifacts, remove them
invalidRequest := true
for _, imageBuild := range compose.ImageBuilds {
if imageBuild.QueueStatus == common.IBFinished || imageBuild.QueueStatus == common.IBFailed {
invalidRequest = false
}
}
if invalidRequest {
return &InvalidRequestError{fmt.Sprintf("Compose %s is not in FINISHED or FAILED.", id)}
}
@ -529,92 +630,115 @@ func (s *Store) DeleteCompose(id uuid.UUID) error {
var err error
if s.stateDir != nil {
err = os.RemoveAll(*s.stateDir + "/outputs/" + id.String())
// TODO: we need to rename the files as the compose will have multiple images
for _, imageBuild := range compose.ImageBuilds {
if imageBuild.QueueStatus == common.IBFinished || imageBuild.QueueStatus == common.IBFailed {
err = os.RemoveAll(s.getImageLocationForLocalTarget(id))
if err != nil {
return err
}
}
}
}
return err
})
}
func (s *Store) PopCompose() Job {
// PopJob returns a job from the job queue and changes the status of the corresponding image build to running
func (s *Store) PopJob() Job {
job := <-s.pendingJobs
s.change(func() error {
// FIXME: handle or comment this possible error
_ = s.change(func() error {
// Get the compose from the map
compose, exists := s.Composes[job.ComposeID]
if !exists || compose.QueueStatus != "WAITING" {
// Check that it exists
if !exists {
panic("Invalid job in queue.")
}
compose.JobStarted = time.Now()
compose.QueueStatus = "RUNNING"
for _, t := range compose.Targets {
t.Status = "RUNNING"
// Loop over the image builds and find the right one according to the image type
// Change queue status to running for the image build as well as for the targets
for n, imageBuild := range compose.ImageBuilds {
imgTypeCompatStr, exists := imageBuild.ImageType.ToCompatString()
if !exists {
panic("fatal error, image type should exist but it does not")
}
if imgTypeCompatStr != job.ImageType {
continue
}
if imageBuild.QueueStatus != common.IBWaiting {
panic("Invalid job in queue.")
}
compose.ImageBuilds[n].QueueStatus = common.IBRunning
for m := range compose.ImageBuilds[n].Targets {
compose.ImageBuilds[n].Targets[m].Status = common.IBRunning
}
}
// Replace the compose struct with the new one
// TODO: I'm not sure this is needed, but I don't know what is the golang semantics in this case
s.Composes[job.ComposeID] = compose
return nil
})
return job
}
func (s *Store) UpdateCompose(composeID uuid.UUID, status string, image *Image, result *common.ComposeResult) error {
// UpdateImageBuildInCompose sets the status and optionally also the final image.
func (s *Store) UpdateImageBuildInCompose(composeID uuid.UUID, imageBuildID int, status common.ImageBuildState, image *compose.Image, result *common.ComposeResult) error {
return s.change(func() error {
compose, exists := s.Composes[composeID]
// Check that the compose exists
currentCompose, exists := s.Composes[composeID]
if !exists {
return &NotFoundError{"compose does not exist"}
}
if compose.QueueStatus == "WAITING" {
// Check that the image build was waiting
if currentCompose.ImageBuilds[imageBuildID].QueueStatus == common.IBWaiting {
return &NotPendingError{"compose has not been popped"}
}
// write result into file
if s.stateDir != nil && result != nil {
f, err := os.Create(*s.stateDir + "/outputs/" + composeID.String() + "/result.json")
f, err := os.Create(s.getImageLocationForLocalTarget(composeID) + "/result.json")
if err != nil {
return fmt.Errorf("cannot open result.json for job %v: %#v", composeID, err)
}
json.NewEncoder(f).Encode(result)
// FIXME: handle error
_ = json.NewEncoder(f).Encode(result)
}
switch status {
case "RUNNING":
switch compose.QueueStatus {
case "RUNNING":
default:
return &NotRunningError{"compose was not running"}
}
case "FINISHED", "FAILED":
switch compose.QueueStatus {
case "RUNNING":
compose.JobFinished = time.Now()
default:
return &NotRunningError{"compose was not running"}
}
compose.QueueStatus = status
for _, t := range compose.Targets {
t.Status = status
}
if status == "FINISHED" {
compose.Image = image
}
s.Composes[composeID] = compose
default:
return &InvalidRequestError{"invalid state transition"}
// Update the image build state including all target states
err := currentCompose.UpdateState(imageBuildID, status)
if err != nil {
// TODO: log error
return &InvalidRequestError{"invalid state transition: " + err.Error()}
}
// In case the image build is done, store the time and possibly also the image
if status == common.IBFinished || status == common.IBFailed {
currentCompose.ImageBuilds[imageBuildID].JobFinished = time.Now()
if status == common.IBFinished {
currentCompose.ImageBuilds[imageBuildID].Image = image
}
}
s.Composes[composeID] = currentCompose
return nil
})
}
func (s *Store) PushSource(source SourceConfig) {
s.change(func() error {
// FIXME: handle or comment this possible error
_ = s.change(func() error {
s.Sources[source.Name] = source
return nil
})
}
func (s *Store) DeleteSource(name string) {
s.change(func() error {
// FIXME: handle or comment this possible error
_ = s.change(func() error {
delete(s.Sources, name)
return nil
})