diff --git a/internal/store/store.go b/internal/store/store.go index 80e44f732..2408e22c5 100644 --- a/internal/store/store.go +++ b/internal/store/store.go @@ -10,6 +10,7 @@ import ( "encoding/json" "errors" "fmt" + "github.com/osbuild/osbuild-composer/internal/compose" "io" "io/ioutil" "log" @@ -36,46 +37,26 @@ import ( type Store struct { Blueprints map[string]blueprint.Blueprint `json:"blueprints"` Workspace map[string]blueprint.Blueprint `json:"workspace"` - Composes map[uuid.UUID]Compose `json:"composes"` + Composes map[uuid.UUID]compose.Compose `json:"composes"` Sources map[string]SourceConfig `json:"sources"` BlueprintsChanges map[string]map[string]blueprint.Change `json:"changes"` - mu sync.RWMutex // protects all fields - pendingJobs chan Job - stateChannel chan []byte - distro distro.Distro - stateDir *string -} - -// A Compose represent the task of building one image. It contains all the information -// necessary to generate the inputs for the job, as well as the job's state. -type Compose struct { - QueueStatus string `json:"queue_status"` - Blueprint *blueprint.Blueprint `json:"blueprint"` - OutputType string `json:"output-type"` - Pipeline *pipeline.Pipeline `json:"pipeline"` - Targets []*target.Target `json:"targets"` - JobCreated time.Time `json:"job_created"` - JobStarted time.Time `json:"job_started"` - JobFinished time.Time `json:"job_finished"` - Image *Image `json:"image"` - Size uint64 `json:"size"` + mu sync.RWMutex // protects all fields + pendingJobs chan Job + stateChannel chan []byte + distro distro.Distro + distroRegistry distro.Registry + stateDir *string } // A Job contains the information about a compose a worker needs to process it. type Job struct { - ComposeID uuid.UUID - Distro string - Pipeline *pipeline.Pipeline - Targets []*target.Target - OutputType string -} - -// An Image represents the image resulting from a compose. -type Image struct { - Path string - Mime string - Size int64 + ComposeID uuid.UUID + ImageBuildID int + Distro string + Pipeline *pipeline.Pipeline + Targets []*target.Target + ImageType string } type SourceConfig struct { @@ -119,7 +100,7 @@ func (e *InvalidRequestError) Error() string { return e.message } -func New(stateDir *string, distro distro.Distro) *Store { +func New(stateDir *string, distroArg distro.Distro, distroRegistryArg distro.Registry) *Store { var s Store if stateDir != nil { @@ -153,7 +134,8 @@ func New(stateDir *string, distro distro.Distro) *Store { } s.pendingJobs = make(chan Job, 200) - s.distro = distro + s.distro = distroArg + s.distroRegistry = distroRegistryArg s.stateDir = stateDir if s.Blueprints == nil { @@ -163,22 +145,36 @@ func New(stateDir *string, distro distro.Distro) *Store { s.Workspace = make(map[string]blueprint.Blueprint) } if s.Composes == nil { - s.Composes = make(map[uuid.UUID]Compose) + s.Composes = make(map[uuid.UUID]compose.Compose) } else { for composeID, compose := range s.Composes { - switch compose.QueueStatus { - case "RUNNING": - // We do not support resuming an in-flight build - compose.QueueStatus = "FAILED" - s.Composes[composeID] = compose - case "WAITING": - // Push waiting composes back into the pending jobs queue - s.pendingJobs <- Job{ - ComposeID: composeID, - Distro: s.distro.Name(), - Pipeline: compose.Pipeline, - Targets: compose.Targets, - OutputType: compose.OutputType, + if len(compose.ImageBuilds) == 0 { + panic("the was a compose with zero image builds, that is forbidden") + } + for _, imgBuild := range compose.ImageBuilds { + switch imgBuild.QueueStatus { + case common.IBRunning: + // We do not support resuming an in-flight build + imgBuild.QueueStatus = common.IBFailed + // s.Composes[composeID] = compose + case common.IBWaiting: + // Push waiting composes back into the pending jobs queue + distroStr, exists := imgBuild.Distro.ToString() + if !exists { + panic("fatal error, distro tag should exist but does not") + } + imageTypeCompat, exists := imgBuild.ImageType.ToCompatString() + if !exists { + panic("fatal error, image type tag should exist but does not") + } + s.pendingJobs <- Job{ + ComposeID: composeID, + ImageBuildID: imgBuild.Id, + Distro: distroStr, + Pipeline: imgBuild.Pipeline, + Targets: imgBuild.Targets, + ImageType: imageTypeCompat, + } } } } @@ -203,7 +199,8 @@ func writeFileAtomically(filename string, data []byte, mode os.FileMode) error { _, err = tmpfile.Write(data) if err != nil { - os.Remove(tmpfile.Name()) + // FIXME: handle or comment this possible error + _ = os.Remove(tmpfile.Name()) return err } @@ -214,13 +211,15 @@ func writeFileAtomically(filename string, data []byte, mode os.FileMode) error { err = tmpfile.Close() if err != nil { - os.Remove(tmpfile.Name()) + // FIXME: handle or comment this possible error + _ = os.Remove(tmpfile.Name()) return err } err = os.Rename(tmpfile.Name(), filename) if err != nil { - os.Remove(tmpfile.Name()) + // FIXME: handle or comment this possible error + _ = os.Remove(tmpfile.Name()) return err } @@ -367,7 +366,8 @@ func bumpVersion(str string) string { } func (s *Store) PushBlueprint(bp blueprint.Blueprint, commitMsg string) { - s.change(func() error { + // FIXME: handle or comment this possible error + _ = s.change(func() error { commit, err := randomSHA1String() if err != nil { return err @@ -397,14 +397,16 @@ func (s *Store) PushBlueprint(bp blueprint.Blueprint, commitMsg string) { } func (s *Store) PushBlueprintToWorkspace(bp blueprint.Blueprint) { - s.change(func() error { + // FIXME: handle or comment this possible error + _ = s.change(func() error { s.Workspace[bp.Name] = bp return nil }) } func (s *Store) DeleteBlueprint(name string) { - s.change(func() error { + // FIXME: handle or comment this possible error + _ = s.change(func() error { delete(s.Workspace, name) delete(s.Blueprints, name) return nil @@ -412,13 +414,14 @@ func (s *Store) DeleteBlueprint(name string) { } func (s *Store) DeleteBlueprintFromWorkspace(name string) { - s.change(func() error { + // FIXME: handle or comment this possible error + _ = s.change(func() error { delete(s.Workspace, name) return nil }) } -func (s *Store) GetCompose(id uuid.UUID) (Compose, bool) { +func (s *Store) GetCompose(id uuid.UUID) (compose.Compose, bool) { s.mu.RLock() defer s.mu.RUnlock() @@ -426,24 +429,16 @@ func (s *Store) GetCompose(id uuid.UUID) (Compose, bool) { return compose, exists } -func (s *Store) GetAllComposes() map[uuid.UUID]Compose { +// GetAllComposes creates a deep copy of all composes present in this store +// and returns them as a dictionary with compose UUIDs as keys +func (s *Store) GetAllComposes() map[uuid.UUID]compose.Compose { s.mu.RLock() defer s.mu.RUnlock() - composes := make(map[uuid.UUID]Compose) - - for id, compose := range s.Composes { - newCompose := compose - newCompose.Targets = []*target.Target{} - - for _, t := range compose.Targets { - newTarget := *t - newCompose.Targets = append(newCompose.Targets, &newTarget) - } - - newBlueprint := *compose.Blueprint - newCompose.Blueprint = &newBlueprint + composes := make(map[uuid.UUID]compose.Compose) + for id, singleCompose := range s.Composes { + newCompose := singleCompose.DeepCopy() composes[id] = newCompose } @@ -457,11 +452,24 @@ func (s *Store) GetComposeResult(id uuid.UUID) (io.ReadCloser, error) { return os.Open(*s.stateDir + "/outputs/" + id.String() + "/result.json") } +func (s *Store) getImageLocationForLocalTarget(composeID uuid.UUID) string { + // This might result in conflicts because one compose can have multiple images, but (!) the Weldr API + // does not support multiple images per build and the RCM API does not support Local target, so it does + // not really matter any more. + return *s.stateDir + "/outputs/" + composeID.String() +} + func (s *Store) PushCompose(composeID uuid.UUID, bp *blueprint.Blueprint, checksums map[string]string, arch, composeType string, size uint64, uploadTarget *target.Target) error { targets := []*target.Target{} + // Compatibility layer for image types in Weldr API v0 + imageType, exists := common.ImageTypeFromCompatString(composeType) + if !exists { + panic("fatal error, compose type does not exist") + } + if s.stateDir != nil { - outputDir := *s.stateDir + "/outputs/" + composeID.String() + outputDir := s.getImageLocationForLocalTarget(composeID) err := os.MkdirAll(outputDir, 0755) if err != nil { @@ -475,6 +483,8 @@ func (s *Store) PushCompose(composeID uuid.UUID, bp *blueprint.Blueprint, checks )) } + size = s.distro.GetSizeForOutputType(composeType, size) + if uploadTarget != nil { targets = append(targets, uploadTarget) } @@ -484,35 +494,119 @@ func (s *Store) PushCompose(composeID uuid.UUID, bp *blueprint.Blueprint, checks repos = append(repos, source.RepoConfig()) } - size = s.distro.GetSizeForOutputType(composeType, size) - - pipeline, err := s.distro.Pipeline(bp, repos, checksums, arch, composeType, size) + pipelineStruct, err := s.distro.Pipeline(bp, repos, checksums, arch, composeType, size) if err != nil { return err } - s.change(func() error { - s.Composes[composeID] = Compose{ - QueueStatus: "WAITING", - Blueprint: bp, - Pipeline: pipeline, - OutputType: composeType, - Targets: targets, - JobCreated: time.Now(), - Size: size, + // FIXME: handle or comment this possible error + _ = s.change(func() error { + s.Composes[composeID] = compose.Compose{ + Blueprint: bp, + ImageBuilds: []compose.ImageBuild{ + { + QueueStatus: common.IBWaiting, + Pipeline: pipelineStruct, + ImageType: imageType, + Targets: targets, + JobCreated: time.Now(), + Size: size, + }, + }, } return nil }) s.pendingJobs <- Job{ - ComposeID: composeID, - Distro: s.distro.Name(), - Pipeline: pipeline, - Targets: targets, - OutputType: composeType, + ComposeID: composeID, + ImageBuildID: 0, + Distro: s.distro.Name(), + Pipeline: pipelineStruct, + Targets: targets, + ImageType: composeType, } return nil } +// PushComposeRequest is an alternative to PushCompose which does not assume a pre-defined distro, as such it is better +// suited for RCM API and possible future API that would respect the fact that we can build any distro and any arch +func (s *Store) PushComposeRequest(request common.ComposeRequest) error { + // This should never happen and once distro.Pipeline is refactored this check will go away + arch, exists := request.Arch.ToString() + if !exists { + panic("fatal error, arch should exist but it does not") + } + distroString, exists := request.Distro.ToString() + if !exists { + panic("fatal error, distro should exist but it does not") + } + + distroStruct := s.distroRegistry.GetDistro(distroString) + if distroStruct == nil { + panic("fatal error, distro should exist but it is not in the registry") + } + + // This will be a list of imageBuilds that will be submitted to the state channel + imageBuilds := []compose.ImageBuild{} + newJobs := []Job{} + + // TODO: remove this + if len(request.RequestedImages) > 1 { + panic("Multiple image requests are not yet properly implemented") + } + + for n, imageRequest := range request.RequestedImages { + // TODO: handle custom upload targets + // TODO: this requires changes in the Compose Request struct + // TODO: implment support for no checksums + // This should never happen and once distro.Pipeline is refactored this check will go away + imgTypeCompatStr, exists := imageRequest.ImgType.ToCompatString() + if !exists { + panic("fatal error, image type should exist but it does not") + } + pipelineStruct, err := distroStruct.Pipeline(&request.Blueprint, request.Repositories, nil, arch, imgTypeCompatStr, 0) + if err != nil { + return err + } + + // This will make the job submission atomic: either all of them or none of them + newJobs = append(newJobs, Job{ + ComposeID: request.ComposeID, + ImageBuildID: n, + Distro: distroString, + Pipeline: pipelineStruct, + Targets: []*target.Target{}, + ImageType: imgTypeCompatStr, + }) + + imageBuilds = append(imageBuilds, compose.ImageBuild{ + Distro: request.Distro, + QueueStatus: common.IBWaiting, + ImageType: imageRequest.ImgType, + Pipeline: pipelineStruct, + Targets: []*target.Target{}, + JobCreated: time.Now(), + }) + } + + // submit all the jobs now + for _, job := range newJobs { + s.pendingJobs <- job + } + + // ignore error because the previous implementation does the same + _ = s.change(func() error { + s.Composes[request.ComposeID] = compose.Compose{ + Blueprint: &request.Blueprint, + ImageBuilds: imageBuilds, + } + return nil + }) + + return nil +} + +// DeleteCompose deletes the compose from the state file and also removes all files on disk that are +// associated with this compose func (s *Store) DeleteCompose(id uuid.UUID) error { return s.change(func() error { compose, exists := s.Composes[id] @@ -521,7 +615,14 @@ func (s *Store) DeleteCompose(id uuid.UUID) error { return &NotFoundError{} } - if compose.QueueStatus != "FINISHED" && compose.QueueStatus != "FAILED" { + // If any of the image builds have build artifacts, remove them + invalidRequest := true + for _, imageBuild := range compose.ImageBuilds { + if imageBuild.QueueStatus == common.IBFinished || imageBuild.QueueStatus == common.IBFailed { + invalidRequest = false + } + } + if invalidRequest { return &InvalidRequestError{fmt.Sprintf("Compose %s is not in FINISHED or FAILED.", id)} } @@ -529,92 +630,115 @@ func (s *Store) DeleteCompose(id uuid.UUID) error { var err error if s.stateDir != nil { - err = os.RemoveAll(*s.stateDir + "/outputs/" + id.String()) + // TODO: we need to rename the files as the compose will have multiple images + for _, imageBuild := range compose.ImageBuilds { + if imageBuild.QueueStatus == common.IBFinished || imageBuild.QueueStatus == common.IBFailed { + err = os.RemoveAll(s.getImageLocationForLocalTarget(id)) + if err != nil { + return err + } + } + } } return err }) } -func (s *Store) PopCompose() Job { +// PopJob returns a job from the job queue and changes the status of the corresponding image build to running +func (s *Store) PopJob() Job { job := <-s.pendingJobs - s.change(func() error { + // FIXME: handle or comment this possible error + _ = s.change(func() error { + // Get the compose from the map compose, exists := s.Composes[job.ComposeID] - if !exists || compose.QueueStatus != "WAITING" { + // Check that it exists + if !exists { panic("Invalid job in queue.") } - compose.JobStarted = time.Now() - compose.QueueStatus = "RUNNING" - for _, t := range compose.Targets { - t.Status = "RUNNING" + // Loop over the image builds and find the right one according to the image type + // Change queue status to running for the image build as well as for the targets + for n, imageBuild := range compose.ImageBuilds { + imgTypeCompatStr, exists := imageBuild.ImageType.ToCompatString() + if !exists { + panic("fatal error, image type should exist but it does not") + } + if imgTypeCompatStr != job.ImageType { + continue + } + if imageBuild.QueueStatus != common.IBWaiting { + panic("Invalid job in queue.") + } + compose.ImageBuilds[n].QueueStatus = common.IBRunning + for m := range compose.ImageBuilds[n].Targets { + compose.ImageBuilds[n].Targets[m].Status = common.IBRunning + } } + // Replace the compose struct with the new one + // TODO: I'm not sure this is needed, but I don't know what is the golang semantics in this case s.Composes[job.ComposeID] = compose return nil }) return job } -func (s *Store) UpdateCompose(composeID uuid.UUID, status string, image *Image, result *common.ComposeResult) error { +// UpdateImageBuildInCompose sets the status and optionally also the final image. +func (s *Store) UpdateImageBuildInCompose(composeID uuid.UUID, imageBuildID int, status common.ImageBuildState, image *compose.Image, result *common.ComposeResult) error { return s.change(func() error { - compose, exists := s.Composes[composeID] + // Check that the compose exists + currentCompose, exists := s.Composes[composeID] if !exists { return &NotFoundError{"compose does not exist"} } - if compose.QueueStatus == "WAITING" { + // Check that the image build was waiting + if currentCompose.ImageBuilds[imageBuildID].QueueStatus == common.IBWaiting { return &NotPendingError{"compose has not been popped"} } // write result into file if s.stateDir != nil && result != nil { - f, err := os.Create(*s.stateDir + "/outputs/" + composeID.String() + "/result.json") + f, err := os.Create(s.getImageLocationForLocalTarget(composeID) + "/result.json") if err != nil { return fmt.Errorf("cannot open result.json for job %v: %#v", composeID, err) } - json.NewEncoder(f).Encode(result) + // FIXME: handle error + _ = json.NewEncoder(f).Encode(result) } - switch status { - case "RUNNING": - switch compose.QueueStatus { - case "RUNNING": - default: - return &NotRunningError{"compose was not running"} - } - case "FINISHED", "FAILED": - switch compose.QueueStatus { - case "RUNNING": - compose.JobFinished = time.Now() - default: - return &NotRunningError{"compose was not running"} - } - compose.QueueStatus = status - for _, t := range compose.Targets { - t.Status = status - } - - if status == "FINISHED" { - compose.Image = image - } - - s.Composes[composeID] = compose - default: - return &InvalidRequestError{"invalid state transition"} + // Update the image build state including all target states + err := currentCompose.UpdateState(imageBuildID, status) + if err != nil { + // TODO: log error + return &InvalidRequestError{"invalid state transition: " + err.Error()} } + + // In case the image build is done, store the time and possibly also the image + if status == common.IBFinished || status == common.IBFailed { + currentCompose.ImageBuilds[imageBuildID].JobFinished = time.Now() + if status == common.IBFinished { + currentCompose.ImageBuilds[imageBuildID].Image = image + } + } + + s.Composes[composeID] = currentCompose + return nil }) } func (s *Store) PushSource(source SourceConfig) { - s.change(func() error { + // FIXME: handle or comment this possible error + _ = s.change(func() error { s.Sources[source.Name] = source return nil }) } func (s *Store) DeleteSource(name string) { - s.change(func() error { + // FIXME: handle or comment this possible error + _ = s.change(func() error { delete(s.Sources, name) return nil })