diff --git a/internal/weldr/store.go b/internal/store/store.go similarity index 76% rename from internal/weldr/store.go rename to internal/store/store.go index 1c46c2362..4da657c78 100644 --- a/internal/weldr/store.go +++ b/internal/store/store.go @@ -1,4 +1,6 @@ -package weldr +// Package store contains primitives for representing and changing the +// osbuild-composer state. +package store import ( "encoding/json" @@ -15,10 +17,12 @@ import ( "github.com/google/uuid" ) -type store struct { +// A Store contains all the persistent state of osbuild-composer, and is serialized +// on every change, and deserialized on start. +type Store struct { Blueprints map[string]blueprint.Blueprint `json:"blueprints"` Workspace map[string]blueprint.Blueprint `json:"workspace"` - Composes map[uuid.UUID]compose `json:"composes"` + Composes map[uuid.UUID]Compose `json:"composes"` mu sync.RWMutex // protects all fields pendingJobs chan<- job.Job @@ -26,7 +30,9 @@ type store struct { stateChannel chan<- []byte } -type compose struct { +// A Compose represent the task of building one image. It contains all the information +// necessary to generate the inputs for the job, as well as the job's state. +type Compose struct { QueueStatus string `json:"queue_status"` Blueprint *blueprint.Blueprint `json:"blueprint"` OutputType string `json:"output-type"` @@ -36,8 +42,15 @@ type compose struct { JobFinished time.Time `json:"job_finished"` } -func newStore(initialState []byte, stateChannel chan<- []byte, pendingJobs chan<- job.Job, jobUpdates <-chan job.Status) *store { - var s store +// An Image represents the image resulting from a compose. +type Image struct { + File *os.File + Name string + Mime string +} + +func New(initialState []byte, stateChannel chan<- []byte, pendingJobs chan<- job.Job, jobUpdates <-chan job.Status) *Store { + var s Store if initialState != nil { err := json.Unmarshal(initialState, &s) @@ -54,7 +67,7 @@ func newStore(initialState []byte, stateChannel chan<- []byte, pendingJobs chan< } if s.Composes == nil { // TODO: push waiting/running composes to workers again - s.Composes = make(map[uuid.UUID]compose) + s.Composes = make(map[uuid.UUID]Compose) } s.stateChannel = stateChannel s.pendingJobs = pendingJobs @@ -87,7 +100,7 @@ func newStore(initialState []byte, stateChannel chan<- []byte, pendingJobs chan< return &s } -func (s *store) change(f func()) { +func (s *Store) change(f func()) { s.mu.Lock() defer s.mu.Unlock() @@ -104,7 +117,7 @@ func (s *store) change(f func()) { } } -func (s *store) listBlueprints() []string { +func (s *Store) ListBlueprints() []string { s.mu.RLock() defer s.mu.RUnlock() @@ -117,7 +130,7 @@ func (s *store) listBlueprints() []string { return names } -type composeEntry struct { +type ComposeEntry struct { ID uuid.UUID `json:"id"` Blueprint string `json:"blueprint"` QueueStatus string `json:"queue_status"` @@ -126,21 +139,21 @@ type composeEntry struct { JobFinished float64 `json:"job_finished,omitempty"` } -func (s *store) listQueue(uuids []uuid.UUID) []*composeEntry { +func (s *Store) ListQueue(uuids []uuid.UUID) []*ComposeEntry { s.mu.RLock() defer s.mu.RUnlock() - newCompose := func(id uuid.UUID, compose compose) *composeEntry { + newCompose := func(id uuid.UUID, compose Compose) *ComposeEntry { switch compose.QueueStatus { case "WAITING": - return &composeEntry{ + return &ComposeEntry{ ID: id, Blueprint: compose.Blueprint.Name, QueueStatus: compose.QueueStatus, JobCreated: float64(compose.JobCreated.UnixNano()) / 1000000000, } case "RUNNING": - return &composeEntry{ + return &ComposeEntry{ ID: id, Blueprint: compose.Blueprint.Name, QueueStatus: compose.QueueStatus, @@ -148,7 +161,7 @@ func (s *store) listQueue(uuids []uuid.UUID) []*composeEntry { JobStarted: float64(compose.JobStarted.UnixNano()) / 1000000000, } case "FAILED", "FINISHED": - return &composeEntry{ + return &ComposeEntry{ ID: id, Blueprint: compose.Blueprint.Name, QueueStatus: compose.QueueStatus, @@ -161,14 +174,14 @@ func (s *store) listQueue(uuids []uuid.UUID) []*composeEntry { } } - var composes []*composeEntry + var composes []*ComposeEntry if uuids == nil { - composes = make([]*composeEntry, 0, len(s.Composes)) + composes = make([]*ComposeEntry, 0, len(s.Composes)) for id, compose := range s.Composes { composes = append(composes, newCompose(id, compose)) } } else { - composes = make([]*composeEntry, 0, len(uuids)) + composes = make([]*ComposeEntry, 0, len(uuids)) for _, id := range uuids { if compose, exists := s.Composes[id]; exists { composes = append(composes, newCompose(id, compose)) @@ -179,7 +192,7 @@ func (s *store) listQueue(uuids []uuid.UUID) []*composeEntry { return composes } -func (s *store) getBlueprint(name string, bp *blueprint.Blueprint, changed *bool) bool { +func (s *Store) GetBlueprint(name string, bp *blueprint.Blueprint, changed *bool) bool { s.mu.RLock() defer s.mu.RUnlock() @@ -214,7 +227,7 @@ func (s *store) getBlueprint(name string, bp *blueprint.Blueprint, changed *bool return true } -func (s *store) getBlueprintCommitted(name string, bp *blueprint.Blueprint) bool { +func (s *Store) GetBlueprintCommitted(name string, bp *blueprint.Blueprint) bool { s.mu.RLock() defer s.mu.RUnlock() @@ -241,38 +254,38 @@ func (s *store) getBlueprintCommitted(name string, bp *blueprint.Blueprint) bool return true } -func (s *store) pushBlueprint(bp blueprint.Blueprint) { +func (s *Store) PushBlueprint(bp blueprint.Blueprint) { s.change(func() { delete(s.Workspace, bp.Name) s.Blueprints[bp.Name] = bp }) } -func (s *store) pushBlueprintToWorkspace(bp blueprint.Blueprint) { +func (s *Store) PushBlueprintToWorkspace(bp blueprint.Blueprint) { s.change(func() { s.Workspace[bp.Name] = bp }) } -func (s *store) deleteBlueprint(name string) { +func (s *Store) DeleteBlueprint(name string) { s.change(func() { delete(s.Workspace, name) delete(s.Blueprints, name) }) } -func (s *store) deleteBlueprintFromWorkspace(name string) { +func (s *Store) DeleteBlueprintFromWorkspace(name string) { s.change(func() { delete(s.Workspace, name) }) } -func (s *store) addCompose(composeID uuid.UUID, bp *blueprint.Blueprint, composeType string) { +func (s *Store) AddCompose(composeID uuid.UUID, bp *blueprint.Blueprint, composeType string) { targets := []*target.Target{ target.NewLocalTarget(target.NewLocalTargetOptions("/var/lib/osbuild-composer/outputs/" + composeID.String())), } s.change(func() { - s.Composes[composeID] = compose{ + s.Composes[composeID] = Compose{ QueueStatus: "WAITING", Blueprint: bp, OutputType: composeType, @@ -287,13 +300,7 @@ func (s *store) addCompose(composeID uuid.UUID, bp *blueprint.Blueprint, compose } } -type image struct { - file *os.File - name string - mime string -} - -func (s *store) getImage(composeID uuid.UUID) (*image, error) { +func (s *Store) GetImage(composeID uuid.UUID) (*Image, error) { s.mu.RLock() defer s.mu.RUnlock() @@ -307,10 +314,10 @@ func (s *store) getImage(composeID uuid.UUID) (*image, error) { case *target.LocalTargetOptions: file, err := os.Open(options.Location + "/" + name) if err == nil { - return &image{ - file: file, - name: name, - mime: mime, + return &Image{ + File: file, + Name: name, + Mime: mime, }, nil } } diff --git a/internal/weldr/api.go b/internal/weldr/api.go index 335d6bf84..1b9056c02 100644 --- a/internal/weldr/api.go +++ b/internal/weldr/api.go @@ -16,10 +16,11 @@ import ( "osbuild-composer/internal/blueprint" "osbuild-composer/internal/job" "osbuild-composer/internal/rpmmd" + "osbuild-composer/internal/store" ) type API struct { - store *store + store *store.Store repo rpmmd.RepoConfig packages rpmmd.PackageList @@ -32,7 +33,7 @@ func New(repo rpmmd.RepoConfig, packages rpmmd.PackageList, logger *log.Logger, // This needs to be shared with the worker API so that they can communicate with each other // builds := make(chan queue.Build, 200) api := &API{ - store: newStore(initialState, stateChannel, jobs, jobStatus), + store: store.New(initialState, stateChannel, jobs, jobStatus), repo: repo, packages: packages, logger: logger, @@ -40,7 +41,7 @@ func New(repo rpmmd.RepoConfig, packages rpmmd.PackageList, logger *log.Logger, // sample blueprint on first run if initialState == nil { - api.store.pushBlueprint(blueprint.Blueprint{ + api.store.PushBlueprint(blueprint.Blueprint{ Name: "example", Description: "An Example", Version: "1", @@ -385,7 +386,7 @@ func (api *API) blueprintsListHandler(writer http.ResponseWriter, request *http. return } - names := api.store.listBlueprints() + names := api.store.ListBlueprints() total := uint(len(names)) offset = min(offset, total) limit = min(limit, total-offset) @@ -420,7 +421,7 @@ func (api *API) blueprintsInfoHandler(writer http.ResponseWriter, request *http. for _, name := range names { var blueprint blueprint.Blueprint var changed bool - if !api.store.getBlueprint(name, &blueprint, &changed) { + if !api.store.GetBlueprint(name, &blueprint, &changed) { statusResponseError(writer, http.StatusNotFound) return } @@ -454,7 +455,7 @@ func (api *API) blueprintsDepsolveHandler(writer http.ResponseWriter, request *h blueprints := []entry{} for _, name := range names { var blueprint blueprint.Blueprint - if !api.store.getBlueprint(name, &blueprint, nil) { + if !api.store.GetBlueprint(name, &blueprint, nil) { statusResponseError(writer, http.StatusNotFound) return } @@ -513,7 +514,7 @@ func (api *API) blueprintsDiffHandler(writer http.ResponseWriter, request *http. // Fetch old and new blueprint details from store and return error if not found var oldBlueprint, newBlueprint blueprint.Blueprint - if !api.store.getBlueprintCommitted(name, &oldBlueprint) || !api.store.getBlueprint(name, &newBlueprint, nil) { + if !api.store.GetBlueprintCommitted(name, &oldBlueprint) || !api.store.GetBlueprint(name, &newBlueprint, nil) { statusResponseError(writer, http.StatusNotFound) return } @@ -563,7 +564,7 @@ func (api *API) blueprintsNewHandler(writer http.ResponseWriter, request *http.R return } - api.store.pushBlueprint(blueprint) + api.store.PushBlueprint(blueprint) statusResponseOK(writer) } @@ -582,18 +583,18 @@ func (api *API) blueprintsWorkspaceHandler(writer http.ResponseWriter, request * return } - api.store.pushBlueprintToWorkspace(blueprint) + api.store.PushBlueprintToWorkspace(blueprint) statusResponseOK(writer) } func (api *API) blueprintDeleteHandler(writer http.ResponseWriter, request *http.Request, params httprouter.Params) { - api.store.deleteBlueprint(params.ByName("blueprint")) + api.store.DeleteBlueprint(params.ByName("blueprint")) statusResponseOK(writer) } func (api *API) blueprintDeleteWorkspaceHandler(writer http.ResponseWriter, request *http.Request, params httprouter.Params) { - api.store.deleteBlueprintFromWorkspace(params.ByName("blueprint")) + api.store.DeleteBlueprintFromWorkspace(params.ByName("blueprint")) statusResponseOK(writer) } @@ -631,10 +632,10 @@ func (api *API) composeHandler(writer http.ResponseWriter, httpRequest *http.Req bp := blueprint.Blueprint{} changed := false - found := api.store.getBlueprint(cr.BlueprintName, &bp, &changed) // TODO: what to do with changed? + found := api.store.GetBlueprint(cr.BlueprintName, &bp, &changed) // TODO: what to do with changed? if found { - api.store.addCompose(reply.BuildID, &bp, cr.ComposeType) + api.store.AddCompose(reply.BuildID, &bp, cr.ComposeType) } else { statusResponseError(writer, http.StatusBadRequest, "blueprint does not exist") return @@ -662,14 +663,14 @@ func (api *API) composeTypesHandler(writer http.ResponseWriter, request *http.Re func (api *API) composeQueueHandler(writer http.ResponseWriter, request *http.Request, _ httprouter.Params) { var reply struct { - New []*composeEntry `json:"new"` - Run []*composeEntry `json:"run"` + New []*store.ComposeEntry `json:"new"` + Run []*store.ComposeEntry `json:"run"` } - reply.New = make([]*composeEntry, 0) - reply.Run = make([]*composeEntry, 0) + reply.New = make([]*store.ComposeEntry, 0) + reply.Run = make([]*store.ComposeEntry, 0) - for _, entry := range api.store.listQueue(nil) { + for _, entry := range api.store.ListQueue(nil) { switch entry.QueueStatus { case "WAITING": reply.New = append(reply.New, entry) @@ -683,7 +684,7 @@ func (api *API) composeQueueHandler(writer http.ResponseWriter, request *http.Re func (api *API) composeStatusHandler(writer http.ResponseWriter, request *http.Request, params httprouter.Params) { var reply struct { - UUIDs []*composeEntry `json:"uuids"` + UUIDs []*store.ComposeEntry `json:"uuids"` } uuidStrings := strings.Split(params.ByName("uuids"), ",") @@ -696,7 +697,7 @@ func (api *API) composeStatusHandler(writer http.ResponseWriter, request *http.R } uuids = append(uuids, id) } - reply.UUIDs = api.store.listQueue(uuids) + reply.UUIDs = api.store.ListQueue(uuids) json.NewEncoder(writer).Encode(reply) } @@ -709,23 +710,23 @@ func (api *API) composeImageHandler(writer http.ResponseWriter, request *http.Re return } - image, err := api.store.getImage(id) + image, err := api.store.GetImage(id) if err != nil { statusResponseError(writer, http.StatusNotFound, "image for compose not found") return } - stat, err := image.file.Stat() + stat, err := image.File.Stat() if err != nil { statusResponseError(writer, http.StatusInternalServerError) return } - writer.Header().Set("Content-Disposition", "attachment; filename="+id.String()+"-"+image.name) - writer.Header().Set("Content-Type", image.mime) + writer.Header().Set("Content-Disposition", "attachment; filename="+id.String()+"-"+image.Name) + writer.Header().Set("Content-Type", image.Mime) writer.Header().Set("Content-Length", fmt.Sprintf("%d", stat.Size())) - io.Copy(writer, image.file) + io.Copy(writer, image.File) } func (api *API) composeFinishedHandler(writer http.ResponseWriter, request *http.Request, _ httprouter.Params) {